send_payload -> send_body

This commit is contained in:
Martin Algesten
2018-06-16 11:25:07 +02:00
parent 43585fcdf0
commit 773c82b4c6
3 changed files with 56 additions and 39 deletions

View File

@@ -15,6 +15,8 @@
- [x] Auth headers - [x] Auth headers
- [x] Repeated headers - [x] Repeated headers
- [x] Cookie jar in agent - [x] Cookie jar in agent
- [ ] Write tests for send body content-length
- [ ] Write tests for send body chunked
- [ ] Forms with application/x-www-form-urlencoded - [ ] Forms with application/x-www-form-urlencoded
- [ ] multipart/form-data - [ ] multipart/form-data
- [ ] Connection reuse/keep-alive with pool - [ ] Connection reuse/keep-alive with pool

View File

@@ -18,7 +18,7 @@ impl ConnectionPool {
url: &Url, url: &Url,
redirects: u32, redirects: u32,
mut jar: Option<&mut CookieJar>, mut jar: Option<&mut CookieJar>,
payload: Payload, body: SizedReader,
) -> Result<Response, Error> { ) -> Result<Response, Error> {
// //
@@ -33,6 +33,12 @@ impl ConnectionPool {
}; };
let headers = request.headers.iter().chain(cookie_headers.iter()); let headers = request.headers.iter().chain(cookie_headers.iter());
let do_chunk = request.header("transfer-encoding")
// if the user has set an encoding header, obey that.
.map(|enc| enc.len() > 0)
// otherwise, no chunking.
.unwrap_or(false);
// open socket // open socket
let mut stream = match url.scheme() { let mut stream = match url.scheme() {
"http" => connect_http(request, &url), "http" => connect_http(request, &url),
@@ -50,6 +56,13 @@ impl ConnectionPool {
for header in headers { for header in headers {
write!(prelude, "{}: {}\r\n", header.name(), header.value())?; write!(prelude, "{}: {}\r\n", header.name(), header.value())?;
} }
// chunking and Content-Length headers are mutually exclusive
// also don't write this if the user has set it themselves
if !do_chunk && !request.has("content-length") {
if let Some(size) = body.size {
write!(prelude, "Content-Length: {}\r\n", size)?;
}
}
write!(prelude, "\r\n")?; write!(prelude, "\r\n")?;
stream.write_all(&mut prelude[..])?; stream.write_all(&mut prelude[..])?;
@@ -91,18 +104,19 @@ impl ConnectionPool {
// perform the redirect differently depending on 3xx code. // perform the redirect differently depending on 3xx code.
return match resp.status { return match resp.status {
301 | 302 | 303 => { 301 | 302 | 303 => {
send_payload(&request, payload, &mut stream)?; send_body(body, do_chunk, &mut stream)?;
self.connect(request, "GET", &new_url, redirects - 1, jar, Payload::Empty) let empty = Payload::Empty.into_read();
self.connect(request, "GET", &new_url, redirects - 1, jar, empty)
} }
307 | 308 | _ => { 307 | 308 | _ => {
self.connect(request, method, &new_url, redirects - 1, jar, payload) self.connect(request, method, &new_url, redirects - 1, jar, body)
} }
}; };
} }
} }
// send the payload (which can be empty now depending on redirects) // send the body (which can be empty now depending on redirects)
send_payload(&request, payload, &mut stream)?; send_body(body, do_chunk, &mut stream)?;
// since it is not a redirect, give away the incoming stream to the response object // since it is not a redirect, give away the incoming stream to the response object
resp.set_stream(stream); resp.set_stream(stream);
@@ -112,28 +126,11 @@ impl ConnectionPool {
} }
} }
fn send_body(body: SizedReader, do_chunk: bool, stream: &mut Stream) -> IoResult<()> {
fn send_payload(request: &Request, payload: Payload, stream: &mut Stream) -> IoResult<()> {
//
let (size, reader) = payload.into_read();
let do_chunk = request.header("transfer-encoding")
// if the user has set an encoding header, obey that.
.map(|enc| enc.eq_ignore_ascii_case("chunked"))
// if the content has a size
.ok_or_else(|| size.
// or if the user set a content-length header
or_else(||
request.header("content-length").map(|len| len.parse::<usize>().unwrap_or(0)))
// and that size is larger than 1MB, chunk,
.map(|size| size > CHUNK_SIZE))
// otherwise, assume chunking since it can be really big.
.unwrap_or(true);
if do_chunk { if do_chunk {
pipe(reader, chunked_transfer::Encoder::new(stream))?; pipe(body.reader, chunked_transfer::Encoder::new(stream))?;
} else { } else {
pipe(reader, stream)?; pipe(body.reader, stream)?;
} }
Ok(()) Ok(())

View File

@@ -1,9 +1,9 @@
use qstring::QString;
use super::SerdeValue; use super::SerdeValue;
use std::sync::Arc; use qstring::QString;
use std::io::Cursor;
use std::io::empty;
use serde_json; use serde_json;
use std::io::empty;
use std::io::Cursor;
use std::sync::Arc;
lazy_static! { lazy_static! {
static ref URL_BASE: Url = { Url::parse("http://localhost/").expect("Failed to parse URL_BASE") }; static ref URL_BASE: Url = { Url::parse("http://localhost/").expect("Failed to parse URL_BASE") };
@@ -48,23 +48,34 @@ impl Default for Payload {
} }
} }
struct SizedReader {
size: Option<usize>,
reader: Box<Read + 'static>,
}
impl SizedReader {
fn new(size: Option<usize>, reader: Box<Read + 'static>) -> Self {
SizedReader { size, reader }
}
}
impl Payload { impl Payload {
fn into_read(self) -> (Option<usize>, Box<Read + 'static>) { fn into_read(self) -> SizedReader {
match self { match self {
Payload::Empty => (Some(0), Box::new(empty())), Payload::Empty => SizedReader::new(Some(0), Box::new(empty())),
Payload::Text(s) => { Payload::Text(s) => {
let bytes = s.into_bytes(); let bytes = s.into_bytes();
let len = bytes.len(); let len = bytes.len();
let cursor = Cursor::new(bytes); let cursor = Cursor::new(bytes);
(Some(len), Box::new(cursor)) SizedReader::new(Some(len), Box::new(cursor))
} }
Payload::JSON(v) => { Payload::JSON(v) => {
let bytes = serde_json::to_vec(&v).expect("Bad JSON in payload"); let bytes = serde_json::to_vec(&v).expect("Bad JSON in payload");
let len = bytes.len(); let len = bytes.len();
let cursor = Cursor::new(bytes); let cursor = Cursor::new(bytes);
(Some(len), Box::new(cursor)) SizedReader::new(Some(len), Box::new(cursor))
} }
Payload::Reader(read) => (None, read), Payload::Reader(read) => SizedReader::new(None, read),
} }
} }
} }
@@ -121,15 +132,20 @@ impl Request {
&url, &url,
self.redirects, self.redirects,
None, None,
payload, payload.into_read(),
) )
} else { } else {
// reuse connection pool. // reuse connection pool.
let state = state.as_mut().unwrap(); let state = state.as_mut().unwrap();
let jar = &mut state.jar; let jar = &mut state.jar;
state state.pool.connect(
.pool self,
.connect(self, &self.method, &url, self.redirects, Some(jar), payload) &self.method,
&url,
self.redirects,
Some(jar),
payload.into_read(),
)
} }
}) })
.unwrap_or_else(|e| e.into()) .unwrap_or_else(|e| e.into())
@@ -137,6 +153,8 @@ impl Request {
/// Send data a json value. /// Send data a json value.
/// ///
/// The `Content-Length` header is implicitly set to the length of the serialized value.
///
/// ``` /// ```
/// #[macro_use] /// #[macro_use]
/// extern crate ureq; /// extern crate ureq;