Turn Unit into a built Request (#223)

This involved removing the Request reference from Unit, and adding an
Agent, a method, and headers.

Also, move is_retryable to Unit.
This commit is contained in:
Jacob Hoffman-Andrews
2020-11-14 01:12:01 -08:00
committed by GitHub
parent acc36ac370
commit ec8dace1af
6 changed files with 82 additions and 75 deletions

View File

@@ -330,8 +330,8 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
stream.reset()?; stream.reset()?;
// insert back into pool // insert back into pool
let key = PoolKey::new(&unit.url, unit.req.agent.config.proxy.clone()); let key = PoolKey::new(&unit.url, unit.agent.config.proxy.clone());
unit.req.agent.state.pool.add(key, stream); unit.agent.state.pool.add(key, stream);
} }
Ok(()) Ok(())

View File

@@ -4,8 +4,7 @@ use std::io::Read;
use url::{form_urlencoded, Url}; use url::{form_urlencoded, Url};
use crate::agent::Agent; use crate::agent::Agent;
use crate::body::BodySize; use crate::body::Payload;
use crate::body::{Payload, SizedReader};
use crate::error::Error; use crate::error::Error;
use crate::header::{self, Header}; use crate::header::{self, Header};
use crate::unit::{self, Unit}; use crate::unit::{self, Unit};
@@ -27,11 +26,11 @@ pub type Result<T> = std::result::Result<T, Error>;
/// ``` /// ```
#[derive(Clone)] #[derive(Clone)]
pub struct Request { pub struct Request {
pub(crate) agent: Agent, agent: Agent,
pub(crate) method: String, method: String,
url: String, url: String,
return_error_for_status: bool, return_error_for_status: bool,
pub(crate) headers: Vec<Header>, headers: Vec<Header>,
query_params: Vec<(String, String)>, query_params: Vec<(String, String)>,
} }
@@ -86,8 +85,8 @@ impl Request {
url.query_pairs_mut().append_pair(&name, &value); url.query_pairs_mut().append_pair(&name, &value);
} }
let reader = payload.into_read(); let reader = payload.into_read();
let unit = Unit::new(&self, &url, &reader); let unit = Unit::new(&self.agent, &self.method, &url, &self.headers, &reader);
let response = unit::connect(&self, unit, true, 0, reader, false)?; let response = unit::connect(unit, true, 0, reader, false)?;
if response.error() && self.return_error_for_status { if response.error() && self.return_error_for_status {
Err(Error::HTTP(response.into())) Err(Error::HTTP(response.into()))
@@ -314,28 +313,6 @@ impl Request {
self.return_error_for_status = value; self.return_error_for_status = value;
self self
} }
// Returns true if this request, with the provided body, is retryable.
pub(crate) fn is_retryable(&self, body: &SizedReader) -> bool {
// Per https://tools.ietf.org/html/rfc7231#section-8.1.3
// these methods are idempotent.
let idempotent = match self.method.as_str() {
"DELETE" | "GET" | "HEAD" | "OPTIONS" | "PUT" | "TRACE" => true,
_ => false,
};
// Unsized bodies aren't retryable because we can't rewind the reader.
// Sized bodies are retryable only if they are zero-length because of
// coincidences of the current implementation - the function responsible
// for retries doesn't have a way to replay a Payload.
let retryable_body = match body.size {
BodySize::Unknown => false,
BodySize::Known(0) => true,
BodySize::Known(_) => false,
BodySize::Empty => true,
};
idempotent && retryable_body
}
} }
#[test] #[test]

View File

@@ -266,7 +266,7 @@ impl Response {
let stream = self.stream.expect("No reader in response?!"); let stream = self.stream.expect("No reader in response?!");
let unit = self.unit; let unit = self.unit;
if let Some(unit) = &unit { if let Some(unit) = &unit {
let result = stream.set_read_timeout(unit.req.agent.config.timeout_read); let result = stream.set_read_timeout(unit.agent.config.timeout_read);
if let Err(e) = result { if let Err(e) = result {
return Box::new(ErrorReader(e)) as Box<dyn Read + Send>; return Box::new(ErrorReader(e)) as Box<dyn Read + Send>;
} }

View File

@@ -315,7 +315,6 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error
let sni = webpki::DNSNameRef::try_from_ascii_str(hostname) let sni = webpki::DNSNameRef::try_from_ascii_str(hostname)
.map_err(|err| Error::DnsFailed(err.to_string()))?; .map_err(|err| Error::DnsFailed(err.to_string()))?;
let tls_conf: &Arc<rustls::ClientConfig> = unit let tls_conf: &Arc<rustls::ClientConfig> = unit
.req
.agent .agent
.config .config
.tls_config .tls_config
@@ -333,12 +332,12 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error
pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> { pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
let connect_deadline: Option<Instant> = let connect_deadline: Option<Instant> =
if let Some(timeout_connect) = unit.req.agent.config.timeout_connect { if let Some(timeout_connect) = unit.agent.config.timeout_connect {
Instant::now().checked_add(timeout_connect) Instant::now().checked_add(timeout_connect)
} else { } else {
unit.deadline unit.deadline
}; };
let proxy: Option<Proxy> = unit.req.agent.config.proxy.clone(); let proxy: Option<Proxy> = unit.agent.config.proxy.clone();
let netloc = match proxy { let netloc = match proxy {
Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port), Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port),
None => format!("{}:{}", hostname, port), None => format!("{}:{}", hostname, port),
@@ -405,13 +404,13 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
if let Some(deadline) = unit.deadline { if let Some(deadline) = unit.deadline {
stream.set_read_timeout(Some(time_until_deadline(deadline)?))?; stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
} else { } else {
stream.set_read_timeout(unit.req.agent.config.timeout_read)?; stream.set_read_timeout(unit.agent.config.timeout_read)?;
} }
if let Some(deadline) = unit.deadline { if let Some(deadline) = unit.deadline {
stream.set_write_timeout(Some(time_until_deadline(deadline)?))?; stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
} else { } else {
stream.set_write_timeout(unit.req.agent.config.timeout_write)?; stream.set_write_timeout(unit.agent.config.timeout_write)?;
} }
if proto == Some(Proto::HTTPConnect) { if proto == Some(Proto::HTTPConnect) {

View File

@@ -59,7 +59,7 @@ fn redirect_head() {
test::make_response(302, "Go here", vec!["Location: /redirect_head2"], vec![]) test::make_response(302, "Go here", vec!["Location: /redirect_head2"], vec![])
}); });
test::set_handler("/redirect_head2", |unit| { test::set_handler("/redirect_head2", |unit| {
assert_eq!(unit.req.method, "HEAD"); assert_eq!(unit.method, "HEAD");
test::make_response(200, "OK", vec!["x-foo: bar"], vec![]) test::make_response(200, "OK", vec!["x-foo: bar"], vec![])
}); });
let resp = head("test://host/redirect_head1").call().unwrap(); let resp = head("test://host/redirect_head1").call().unwrap();
@@ -75,7 +75,7 @@ fn redirect_get() {
test::make_response(302, "Go here", vec!["Location: /redirect_get2"], vec![]) test::make_response(302, "Go here", vec!["Location: /redirect_get2"], vec![])
}); });
test::set_handler("/redirect_get2", |unit| { test::set_handler("/redirect_get2", |unit| {
assert_eq!(unit.req.method, "GET"); assert_eq!(unit.method, "GET");
assert!(unit.has("Range")); assert!(unit.has("Range"));
assert_eq!(unit.header("Range").unwrap(), "bytes=10-50"); assert_eq!(unit.header("Range").unwrap(), "bytes=10-50");
test::make_response(200, "OK", vec!["x-foo: bar"], vec![]) test::make_response(200, "OK", vec!["x-foo: bar"], vec![])
@@ -119,7 +119,7 @@ fn redirect_post() {
test::make_response(302, "Go here", vec!["Location: /redirect_post2"], vec![]) test::make_response(302, "Go here", vec!["Location: /redirect_post2"], vec![])
}); });
test::set_handler("/redirect_post2", |unit| { test::set_handler("/redirect_post2", |unit| {
assert_eq!(unit.req.method, "GET"); assert_eq!(unit.method, "GET");
test::make_response(200, "OK", vec!["x-foo: bar"], vec![]) test::make_response(200, "OK", vec!["x-foo: bar"], vec![])
}); });
let resp = post("test://host/redirect_post1").call().unwrap(); let resp = post("test://host/redirect_post1").call().unwrap();

View File

@@ -7,33 +7,41 @@ use url::Url;
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
use cookie::Cookie; use cookie::Cookie;
use crate::body::{self, BodySize, Payload, SizedReader};
use crate::header; use crate::header;
use crate::resolve::ArcResolver; use crate::resolve::ArcResolver;
use crate::stream::{self, connect_test, Stream}; use crate::stream::{self, connect_test, Stream};
#[cfg(feature = "cookies")]
use crate::Agent; use crate::Agent;
use crate::{Error, Header, Request, Response}; use crate::{
body::{self, BodySize, Payload, SizedReader},
header::get_header,
};
use crate::{Error, Header, Response};
/// It's a "unit of work". Maybe a bad name for it? /// A Unit is fully-built Request, ready to execute.
/// ///
/// *Internal API* /// *Internal API*
pub(crate) struct Unit { pub(crate) struct Unit {
pub req: Request, pub agent: Agent,
pub method: String,
pub url: Url, pub url: Url,
pub is_chunked: bool, is_chunked: bool,
pub headers: Vec<Header>, headers: Vec<Header>,
pub deadline: Option<time::Instant>, pub deadline: Option<time::Instant>,
} }
impl Unit { impl Unit {
// //
pub(crate) fn new(req: &Request, url: &Url, body: &SizedReader) -> Self { pub(crate) fn new(
agent: &Agent,
method: &str,
url: &Url,
headers: &Vec<Header>,
body: &SizedReader,
) -> Self {
// //
let (is_transfer_encoding_set, mut is_chunked) = req let (is_transfer_encoding_set, mut is_chunked) = get_header(&headers, "transfer-encoding")
.header("transfer-encoding")
// if the user has set an encoding header, obey that. // if the user has set an encoding header, obey that.
.map(|enc| { .map(|enc| {
let is_transfer_encoding_set = !enc.is_empty(); let is_transfer_encoding_set = !enc.is_empty();
@@ -51,7 +59,7 @@ impl Unit {
// chunking and Content-Length headers are mutually exclusive // chunking and Content-Length headers are mutually exclusive
// also don't write this if the user has set it themselves // also don't write this if the user has set it themselves
if !is_chunked && !req.has("content-length") { if !is_chunked && get_header(&headers, "content-length").is_none() {
// if the payload is of known size (everything beside an unsized reader), set // if the payload is of known size (everything beside an unsized reader), set
// Content-Length, // Content-Length,
// otherwise, use the chunked Transfer-Encoding (only if no other Transfer-Encoding // otherwise, use the chunked Transfer-Encoding (only if no other Transfer-Encoding
@@ -72,25 +80,25 @@ impl Unit {
let username = url.username(); let username = url.username();
let password = url.password().unwrap_or(""); let password = url.password().unwrap_or("");
if (username != "" || password != "") && !req.has("authorization") { if (username != "" || password != "") && get_header(&headers, "authorization").is_none()
{
let encoded = base64::encode(&format!("{}:{}", username, password)); let encoded = base64::encode(&format!("{}:{}", username, password));
extra.push(Header::new("Authorization", &format!("Basic {}", encoded))); extra.push(Header::new("Authorization", &format!("Basic {}", encoded)));
} }
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
extra.extend(extract_cookies(&req.agent, &url).into_iter()); extra.extend(extract_cookies(agent, &url).into_iter());
extra extra
}; };
let headers: Vec<_> = req let headers: Vec<_> = headers
.headers
.iter() .iter()
.chain(extra_headers.iter()) .chain(extra_headers.iter())
.cloned() .cloned()
.collect(); .collect();
let deadline = match req.agent.config.timeout { let deadline = match agent.config.timeout {
None => None, None => None,
Some(timeout) => { Some(timeout) => {
let now = time::Instant::now(); let now = time::Instant::now();
@@ -99,7 +107,8 @@ impl Unit {
}; };
Unit { Unit {
req: req.clone(), agent: agent.clone(),
method: method.to_string(),
url: url.clone(), url: url.clone(),
is_chunked, is_chunked,
headers, headers,
@@ -108,11 +117,11 @@ impl Unit {
} }
pub fn is_head(&self) -> bool { pub fn is_head(&self) -> bool {
self.req.method.eq_ignore_ascii_case("head") self.method.eq_ignore_ascii_case("head")
} }
pub fn resolver(&self) -> ArcResolver { pub fn resolver(&self) -> ArcResolver {
self.req.agent.state.resolver.clone() self.agent.state.resolver.clone()
} }
#[cfg(test)] #[cfg(test)]
@@ -127,11 +136,32 @@ impl Unit {
pub fn all(&self, name: &str) -> Vec<&str> { pub fn all(&self, name: &str) -> Vec<&str> {
header::get_all_headers(&self.headers, name) header::get_all_headers(&self.headers, name)
} }
// Returns true if this request, with the provided body, is retryable.
pub(crate) fn is_retryable(&self, body: &SizedReader) -> bool {
// Per https://tools.ietf.org/html/rfc7231#section-8.1.3
// these methods are idempotent.
let idempotent = match self.method.as_str() {
"DELETE" | "GET" | "HEAD" | "OPTIONS" | "PUT" | "TRACE" => true,
_ => false,
};
// Unsized bodies aren't retryable because we can't rewind the reader.
// Sized bodies are retryable only if they are zero-length because of
// coincidences of the current implementation - the function responsible
// for retries doesn't have a way to replay a Payload.
let retryable_body = match body.size {
BodySize::Unknown => false,
BodySize::Known(0) => true,
BodySize::Known(_) => false,
BodySize::Empty => true,
};
idempotent && retryable_body
}
} }
/// Perform a connection. Used recursively for redirects. /// Perform a connection. Used recursively for redirects.
pub(crate) fn connect( pub(crate) fn connect(
req: &Request,
unit: Unit, unit: Unit,
use_pooled: bool, use_pooled: bool,
redirect_count: u32, redirect_count: u32,
@@ -145,7 +175,7 @@ pub(crate) fn connect(
.host_str() .host_str()
.ok_or(Error::BadUrl("no host".to_string()))?; .ok_or(Error::BadUrl("no host".to_string()))?;
let url = &unit.url; let url = &unit.url;
let method = &unit.req.method; let method = &unit.method;
// open socket // open socket
let (mut stream, is_recycled) = connect_socket(&unit, &host, use_pooled)?; let (mut stream, is_recycled) = connect_socket(&unit, &host, use_pooled)?;
@@ -162,13 +192,13 @@ pub(crate) fn connect(
debug!("retrying request early {} {}", method, url); debug!("retrying request early {} {}", method, url);
// we try open a new connection, this time there will be // we try open a new connection, this time there will be
// no connection in the pool. don't use it. // no connection in the pool. don't use it.
return connect(req, unit, false, redirect_count, body, redir); return connect(unit, false, redirect_count, body, redir);
} else { } else {
// not a pooled connection, propagate the error. // not a pooled connection, propagate the error.
return Err(err.into()); return Err(err.into());
} }
} }
let retryable = req.is_retryable(&body); let retryable = unit.is_retryable(&body);
// send the body (which can be empty now depending on redirects) // send the body (which can be empty now depending on redirects)
body::send_body(body, unit.is_chunked, &mut stream)?; body::send_body(body, unit.is_chunked, &mut stream)?;
@@ -191,7 +221,7 @@ pub(crate) fn connect(
Err(err) if err.connection_closed() && retryable && is_recycled => { Err(err) if err.connection_closed() && retryable && is_recycled => {
debug!("retrying request {} {}", method, url); debug!("retrying request {} {}", method, url);
let empty = Payload::Empty.into_read(); let empty = Payload::Empty.into_read();
return connect(req, unit, false, redirect_count, empty, redir); return connect(unit, false, redirect_count, empty, redir);
} }
Err(e) => return Err(e), Err(e) => return Err(e),
Ok(resp) => resp, Ok(resp) => resp,
@@ -202,8 +232,8 @@ pub(crate) fn connect(
save_cookies(&unit, &resp); save_cookies(&unit, &resp);
// handle redirects // handle redirects
if resp.redirect() && req.agent.config.redirects > 0 { if resp.redirect() && unit.agent.config.redirects > 0 {
if redirect_count == req.agent.config.redirects { if redirect_count == unit.agent.config.redirects {
return Err(Error::TooManyRedirects); return Err(Error::TooManyRedirects);
} }
@@ -219,16 +249,18 @@ pub(crate) fn connect(
match resp.status() { match resp.status() {
301 | 302 | 303 => { 301 | 302 | 303 => {
let empty = Payload::Empty.into_read(); let empty = Payload::Empty.into_read();
// recreate the unit to get a new hostname and cookies for the new host.
let mut new_unit = Unit::new(req, &new_url, &empty);
// this is to follow how curl does it. POST, PUT etc change // this is to follow how curl does it. POST, PUT etc change
// to GET on a redirect. // to GET on a redirect.
new_unit.req.method = match &method[..] { let new_method = match &method[..] {
"GET" | "HEAD" => method.to_string(), "GET" | "HEAD" => method.to_string(),
_ => "GET".into(), _ => "GET".into(),
}; };
// recreate the unit to get a new hostname and cookies for the new host.
let new_unit =
Unit::new(&unit.agent, &new_method, &new_url, &unit.headers, &empty);
debug!("redirect {} {} -> {}", resp.status(), url, new_url); debug!("redirect {} {} -> {}", resp.status(), url, new_url);
return connect(req, new_unit, use_pooled, redirect_count + 1, empty, true); return connect(new_unit, use_pooled, redirect_count + 1, empty, true);
} }
_ => (), _ => (),
// reinstate this with expect-100 // reinstate this with expect-100
@@ -273,14 +305,14 @@ fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stre
_ => return Err(Error::UnknownScheme(unit.url.scheme().to_string())), _ => return Err(Error::UnknownScheme(unit.url.scheme().to_string())),
}; };
if use_pooled { if use_pooled {
let agent = &unit.req.agent; let agent = &unit.agent;
// The connection may have been closed by the server // The connection may have been closed by the server
// due to idle timeout while it was sitting in the pool. // due to idle timeout while it was sitting in the pool.
// Loop until we find one that is still good or run out of connections. // Loop until we find one that is still good or run out of connections.
while let Some(stream) = agent while let Some(stream) = agent
.state .state
.pool .pool
.try_get_connection(&unit.url, unit.req.agent.config.proxy.clone()) .try_get_connection(&unit.url, unit.agent.config.proxy.clone())
{ {
let server_closed = stream.server_closed()?; let server_closed = stream.server_closed()?;
if !server_closed { if !server_closed {
@@ -309,7 +341,7 @@ fn send_prelude(unit: &Unit, stream: &mut Stream, redir: bool) -> io::Result<()>
write!( write!(
prelude, prelude,
"{} {}{}{} HTTP/1.1\r\n", "{} {}{}{} HTTP/1.1\r\n",
unit.req.method, unit.method,
unit.url.path(), unit.url.path(),
if unit.url.query().is_some() { "?" } else { "" }, if unit.url.query().is_some() { "?" } else { "" },
unit.url.query().unwrap_or_default(), unit.url.query().unwrap_or_default(),
@@ -380,8 +412,7 @@ fn save_cookies(unit: &Unit, resp: &Response) {
Ok(c) => Some(c), Ok(c) => Some(c),
} }
}); });
unit.req unit.agent
.agent
.state .state
.cookie_tin .cookie_tin
.store_response_cookies(cookies, &unit.url.clone()); .store_response_cookies(cookies, &unit.url.clone());