From ec8dace1af525cc69cae7f01b038be911a791aa5 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Sat, 14 Nov 2020 01:12:01 -0800 Subject: [PATCH] Turn Unit into a built Request (#223) This involved removing the Request reference from Unit, and adding an Agent, a method, and headers. Also, move is_retryable to Unit. --- src/pool.rs | 4 +- src/request.rs | 35 +++------------ src/response.rs | 2 +- src/stream.rs | 9 ++-- src/test/redirect.rs | 6 +-- src/unit.rs | 101 ++++++++++++++++++++++++++++--------------- 6 files changed, 82 insertions(+), 75 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index b473c83..6f732b3 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -330,8 +330,8 @@ impl> PoolReturnRead { stream.reset()?; // insert back into pool - let key = PoolKey::new(&unit.url, unit.req.agent.config.proxy.clone()); - unit.req.agent.state.pool.add(key, stream); + let key = PoolKey::new(&unit.url, unit.agent.config.proxy.clone()); + unit.agent.state.pool.add(key, stream); } Ok(()) diff --git a/src/request.rs b/src/request.rs index 7315898..4bb2015 100644 --- a/src/request.rs +++ b/src/request.rs @@ -4,8 +4,7 @@ use std::io::Read; use url::{form_urlencoded, Url}; use crate::agent::Agent; -use crate::body::BodySize; -use crate::body::{Payload, SizedReader}; +use crate::body::Payload; use crate::error::Error; use crate::header::{self, Header}; use crate::unit::{self, Unit}; @@ -27,11 +26,11 @@ pub type Result = std::result::Result; /// ``` #[derive(Clone)] pub struct Request { - pub(crate) agent: Agent, - pub(crate) method: String, + agent: Agent, + method: String, url: String, return_error_for_status: bool, - pub(crate) headers: Vec
, + headers: Vec
, query_params: Vec<(String, String)>, } @@ -86,8 +85,8 @@ impl Request { url.query_pairs_mut().append_pair(&name, &value); } let reader = payload.into_read(); - let unit = Unit::new(&self, &url, &reader); - let response = unit::connect(&self, unit, true, 0, reader, false)?; + let unit = Unit::new(&self.agent, &self.method, &url, &self.headers, &reader); + let response = unit::connect(unit, true, 0, reader, false)?; if response.error() && self.return_error_for_status { Err(Error::HTTP(response.into())) @@ -314,28 +313,6 @@ impl Request { self.return_error_for_status = value; self } - - // Returns true if this request, with the provided body, is retryable. - pub(crate) fn is_retryable(&self, body: &SizedReader) -> bool { - // Per https://tools.ietf.org/html/rfc7231#section-8.1.3 - // these methods are idempotent. - let idempotent = match self.method.as_str() { - "DELETE" | "GET" | "HEAD" | "OPTIONS" | "PUT" | "TRACE" => true, - _ => false, - }; - // Unsized bodies aren't retryable because we can't rewind the reader. - // Sized bodies are retryable only if they are zero-length because of - // coincidences of the current implementation - the function responsible - // for retries doesn't have a way to replay a Payload. - let retryable_body = match body.size { - BodySize::Unknown => false, - BodySize::Known(0) => true, - BodySize::Known(_) => false, - BodySize::Empty => true, - }; - - idempotent && retryable_body - } } #[test] diff --git a/src/response.rs b/src/response.rs index cc91933..8a492bd 100644 --- a/src/response.rs +++ b/src/response.rs @@ -266,7 +266,7 @@ impl Response { let stream = self.stream.expect("No reader in response?!"); let unit = self.unit; if let Some(unit) = &unit { - let result = stream.set_read_timeout(unit.req.agent.config.timeout_read); + let result = stream.set_read_timeout(unit.agent.config.timeout_read); if let Err(e) = result { return Box::new(ErrorReader(e)) as Box; } diff --git a/src/stream.rs b/src/stream.rs index f8372a6..65d7d9e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -315,7 +315,6 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result = unit - .req .agent .config .tls_config @@ -333,12 +332,12 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result Result { let connect_deadline: Option = - if let Some(timeout_connect) = unit.req.agent.config.timeout_connect { + if let Some(timeout_connect) = unit.agent.config.timeout_connect { Instant::now().checked_add(timeout_connect) } else { unit.deadline }; - let proxy: Option = unit.req.agent.config.proxy.clone(); + let proxy: Option = unit.agent.config.proxy.clone(); let netloc = match proxy { Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port), None => format!("{}:{}", hostname, port), @@ -405,13 +404,13 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result, + is_chunked: bool, + headers: Vec
, pub deadline: Option, } impl Unit { // - pub(crate) fn new(req: &Request, url: &Url, body: &SizedReader) -> Self { + pub(crate) fn new( + agent: &Agent, + method: &str, + url: &Url, + headers: &Vec
, + body: &SizedReader, + ) -> Self { // - let (is_transfer_encoding_set, mut is_chunked) = req - .header("transfer-encoding") + let (is_transfer_encoding_set, mut is_chunked) = get_header(&headers, "transfer-encoding") // if the user has set an encoding header, obey that. .map(|enc| { let is_transfer_encoding_set = !enc.is_empty(); @@ -51,7 +59,7 @@ impl Unit { // chunking and Content-Length headers are mutually exclusive // also don't write this if the user has set it themselves - if !is_chunked && !req.has("content-length") { + if !is_chunked && get_header(&headers, "content-length").is_none() { // if the payload is of known size (everything beside an unsized reader), set // Content-Length, // otherwise, use the chunked Transfer-Encoding (only if no other Transfer-Encoding @@ -72,25 +80,25 @@ impl Unit { let username = url.username(); let password = url.password().unwrap_or(""); - if (username != "" || password != "") && !req.has("authorization") { + if (username != "" || password != "") && get_header(&headers, "authorization").is_none() + { let encoded = base64::encode(&format!("{}:{}", username, password)); extra.push(Header::new("Authorization", &format!("Basic {}", encoded))); } #[cfg(feature = "cookies")] - extra.extend(extract_cookies(&req.agent, &url).into_iter()); + extra.extend(extract_cookies(agent, &url).into_iter()); extra }; - let headers: Vec<_> = req - .headers + let headers: Vec<_> = headers .iter() .chain(extra_headers.iter()) .cloned() .collect(); - let deadline = match req.agent.config.timeout { + let deadline = match agent.config.timeout { None => None, Some(timeout) => { let now = time::Instant::now(); @@ -99,7 +107,8 @@ impl Unit { }; Unit { - req: req.clone(), + agent: agent.clone(), + method: method.to_string(), url: url.clone(), is_chunked, headers, @@ -108,11 +117,11 @@ impl Unit { } pub fn is_head(&self) -> bool { - self.req.method.eq_ignore_ascii_case("head") + self.method.eq_ignore_ascii_case("head") } pub fn resolver(&self) -> ArcResolver { - self.req.agent.state.resolver.clone() + self.agent.state.resolver.clone() } #[cfg(test)] @@ -127,11 +136,32 @@ impl Unit { pub fn all(&self, name: &str) -> Vec<&str> { header::get_all_headers(&self.headers, name) } + + // Returns true if this request, with the provided body, is retryable. + pub(crate) fn is_retryable(&self, body: &SizedReader) -> bool { + // Per https://tools.ietf.org/html/rfc7231#section-8.1.3 + // these methods are idempotent. + let idempotent = match self.method.as_str() { + "DELETE" | "GET" | "HEAD" | "OPTIONS" | "PUT" | "TRACE" => true, + _ => false, + }; + // Unsized bodies aren't retryable because we can't rewind the reader. + // Sized bodies are retryable only if they are zero-length because of + // coincidences of the current implementation - the function responsible + // for retries doesn't have a way to replay a Payload. + let retryable_body = match body.size { + BodySize::Unknown => false, + BodySize::Known(0) => true, + BodySize::Known(_) => false, + BodySize::Empty => true, + }; + + idempotent && retryable_body + } } /// Perform a connection. Used recursively for redirects. pub(crate) fn connect( - req: &Request, unit: Unit, use_pooled: bool, redirect_count: u32, @@ -145,7 +175,7 @@ pub(crate) fn connect( .host_str() .ok_or(Error::BadUrl("no host".to_string()))?; let url = &unit.url; - let method = &unit.req.method; + let method = &unit.method; // open socket let (mut stream, is_recycled) = connect_socket(&unit, &host, use_pooled)?; @@ -162,13 +192,13 @@ pub(crate) fn connect( debug!("retrying request early {} {}", method, url); // we try open a new connection, this time there will be // no connection in the pool. don't use it. - return connect(req, unit, false, redirect_count, body, redir); + return connect(unit, false, redirect_count, body, redir); } else { // not a pooled connection, propagate the error. return Err(err.into()); } } - let retryable = req.is_retryable(&body); + let retryable = unit.is_retryable(&body); // send the body (which can be empty now depending on redirects) body::send_body(body, unit.is_chunked, &mut stream)?; @@ -191,7 +221,7 @@ pub(crate) fn connect( Err(err) if err.connection_closed() && retryable && is_recycled => { debug!("retrying request {} {}", method, url); let empty = Payload::Empty.into_read(); - return connect(req, unit, false, redirect_count, empty, redir); + return connect(unit, false, redirect_count, empty, redir); } Err(e) => return Err(e), Ok(resp) => resp, @@ -202,8 +232,8 @@ pub(crate) fn connect( save_cookies(&unit, &resp); // handle redirects - if resp.redirect() && req.agent.config.redirects > 0 { - if redirect_count == req.agent.config.redirects { + if resp.redirect() && unit.agent.config.redirects > 0 { + if redirect_count == unit.agent.config.redirects { return Err(Error::TooManyRedirects); } @@ -219,16 +249,18 @@ pub(crate) fn connect( match resp.status() { 301 | 302 | 303 => { let empty = Payload::Empty.into_read(); - // recreate the unit to get a new hostname and cookies for the new host. - let mut new_unit = Unit::new(req, &new_url, &empty); // this is to follow how curl does it. POST, PUT etc change // to GET on a redirect. - new_unit.req.method = match &method[..] { + let new_method = match &method[..] { "GET" | "HEAD" => method.to_string(), _ => "GET".into(), }; + // recreate the unit to get a new hostname and cookies for the new host. + let new_unit = + Unit::new(&unit.agent, &new_method, &new_url, &unit.headers, &empty); + debug!("redirect {} {} -> {}", resp.status(), url, new_url); - return connect(req, new_unit, use_pooled, redirect_count + 1, empty, true); + return connect(new_unit, use_pooled, redirect_count + 1, empty, true); } _ => (), // reinstate this with expect-100 @@ -273,14 +305,14 @@ fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stre _ => return Err(Error::UnknownScheme(unit.url.scheme().to_string())), }; if use_pooled { - let agent = &unit.req.agent; + let agent = &unit.agent; // The connection may have been closed by the server // due to idle timeout while it was sitting in the pool. // Loop until we find one that is still good or run out of connections. while let Some(stream) = agent .state .pool - .try_get_connection(&unit.url, unit.req.agent.config.proxy.clone()) + .try_get_connection(&unit.url, unit.agent.config.proxy.clone()) { let server_closed = stream.server_closed()?; if !server_closed { @@ -309,7 +341,7 @@ fn send_prelude(unit: &Unit, stream: &mut Stream, redir: bool) -> io::Result<()> write!( prelude, "{} {}{}{} HTTP/1.1\r\n", - unit.req.method, + unit.method, unit.url.path(), if unit.url.query().is_some() { "?" } else { "" }, unit.url.query().unwrap_or_default(), @@ -380,8 +412,7 @@ fn save_cookies(unit: &Unit, resp: &Response) { Ok(c) => Some(c), } }); - unit.req - .agent + unit.agent .state .cookie_tin .store_response_cookies(cookies, &unit.url.clone());