From bd2761e28015c713fc77f55e2bfa933afa5627d1 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 19 Oct 2020 06:43:27 +0000 Subject: [PATCH 1/7] Update env_logger requirement from 0.7.1 to 0.8.1 Updates the requirements on [env_logger](https://github.com/env-logger-rs/env_logger) to permit the latest version. - [Release notes](https://github.com/env-logger-rs/env_logger/releases) - [Changelog](https://github.com/env-logger-rs/env_logger/blob/master/CHANGELOG.md) - [Commits](https://github.com/env-logger-rs/env_logger/compare/v0.7.1...v0.8.1) Signed-off-by: dependabot-preview[bot] --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ac9318a..e449064 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ serde = { version = "1", features = ["derive"] } rayon = "1.3.0" rayon-core = "1.7.0" chrono = "0.4.11" -env_logger = "0.7.1" +env_logger = "0.8.1" [[example]] name = "smoke-test" From 67c28d28a352c832b7357e0eb0a015cc3c3d2155 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Tue, 20 Oct 2020 23:46:54 -0700 Subject: [PATCH 2/7] Check for synthetic_error early in unit::connect --- src/response.rs | 5 +++++ src/unit.rs | 3 +++ 2 files changed, 8 insertions(+) diff --git a/src/response.rs b/src/response.rs index 38f44f1..a768535 100644 --- a/src/response.rs +++ b/src/response.rs @@ -221,6 +221,11 @@ impl Response { &self.error } + // Internal-only API, to allow unit::connect to return early on errors. + pub(crate) fn into_error(self) -> Option { + self.error + } + /// The content type part of the "Content-Type" header without /// the charset. /// diff --git a/src/unit.rs b/src/unit.rs index 3ec4794..0431009 100644 --- a/src/unit.rs +++ b/src/unit.rs @@ -181,6 +181,9 @@ pub(crate) fn connect( // start reading the response to process cookies and redirects. let mut stream = stream::DeadlineStream::new(stream, unit.deadline); let mut resp = Response::from_read(&mut stream); + if resp.synthetic_error().is_some() { + return Err(resp.into_error().unwrap()); + } // https://tools.ietf.org/html/rfc7230#section-6.3.1 // When an inbound connection is closed prematurely, a client MAY From 14475cb5c797714a4158d98deaf3e5c1f8947a5d Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 21 Oct 2020 00:19:30 -0700 Subject: [PATCH 3/7] Add test for read_timeout during headers. --- src/test/timeout.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/test/timeout.rs b/src/test/timeout.rs index cb69055..2553dd9 100644 --- a/src/test/timeout.rs +++ b/src/test/timeout.rs @@ -59,6 +59,18 @@ fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> { Ok(()) } +#[test] +fn read_timeout_during_headers() { + let server = TestServer::new(dribble_headers_respond); + let url = format!("http://localhost:{}/", server.port); + let resp = crate::get(&url).timeout_read(10).call(); + assert!(!resp.ok()); + assert_eq!( + resp.into_string().unwrap(), + "Network Error: timed out reading response\n" + ); +} + #[test] fn overall_timeout_during_headers() { // Start a test server on an available port, that dribbles out a response at 1 write per 10ms. From 1f5f65877ad2911c599a12e79ef794b9e16f2a39 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 21 Oct 2020 10:08:47 -0700 Subject: [PATCH 4/7] Update overall_timeout_during_headers test. --- src/test/timeout.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/test/timeout.rs b/src/test/timeout.rs index 2553dd9..be9654a 100644 --- a/src/test/timeout.rs +++ b/src/test/timeout.rs @@ -28,15 +28,15 @@ fn get_and_expect_timeout(url: String) { let agent = Agent::default().build(); let timeout = Duration::from_millis(500); let resp = agent.get(&url).timeout(timeout).call(); - - match resp.into_string() { - Err(io_error) => match io_error.kind() { - io::ErrorKind::TimedOut => Ok(()), - _ => Err(format!("{:?}", io_error)), - }, - Ok(_) => Err("successful response".to_string()), - } - .expect("expected timeout but got something else"); + assert!( + matches!(resp.synthetic_error(), Some(Error::Io(_))), + "expected timeout error, got {:?}", + resp.synthetic_error() + ); + assert_eq!( + resp.synthetic_error().as_ref().unwrap().body_text(), + "Network Error: timed out reading response" + ); } #[test] From 32f9ebc04ad147a199c6ae753a6b7f7a878f7d8a Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 21 Oct 2020 10:26:00 -0700 Subject: [PATCH 5/7] Separate overall_timeout checks. --- src/test/timeout.rs | 32 +++++++++++++++++++++----------- src/unit.rs | 5 ++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/test/timeout.rs b/src/test/timeout.rs index be9654a..7a296e4 100644 --- a/src/test/timeout.rs +++ b/src/test/timeout.rs @@ -28,15 +28,15 @@ fn get_and_expect_timeout(url: String) { let agent = Agent::default().build(); let timeout = Duration::from_millis(500); let resp = agent.get(&url).timeout(timeout).call(); - assert!( - matches!(resp.synthetic_error(), Some(Error::Io(_))), - "expected timeout error, got {:?}", - resp.synthetic_error() - ); - assert_eq!( - resp.synthetic_error().as_ref().unwrap().body_text(), - "Network Error: timed out reading response" - ); + + match resp.into_string() { + Err(io_error) => match io_error.kind() { + io::ErrorKind::TimedOut => Ok(()), + _ => Err(format!("{:?}", io_error)), + }, + Ok(_) => Err("successful response".to_string()), + } + .expect("expected timeout but got something else"); } #[test] @@ -50,7 +50,6 @@ fn overall_timeout_during_body() { // Send HTTP headers on the TcpStream at a rate of one header every 100 // milliseconds, for a total of 30 headers. fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> { - stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n")?; for _ in 0..30 { stream.write_all(b"a: b\n")?; stream.flush()?; @@ -76,7 +75,18 @@ fn overall_timeout_during_headers() { // Start a test server on an available port, that dribbles out a response at 1 write per 10ms. let server = TestServer::new(dribble_headers_respond); let url = format!("http://localhost:{}/", server.port); - get_and_expect_timeout(url); + let agent = Agent::default().build(); + let timeout = Duration::from_millis(500); + let resp = agent.get(&url).timeout(timeout).call(); + assert!( + matches!(resp.synthetic_error(), Some(Error::Io(_))), + "expected timeout error, got {:?}", + resp.synthetic_error() + ); + assert_eq!( + resp.synthetic_error().as_ref().unwrap().body_text(), + "Network Error: timed out reading response" + ); } #[test] diff --git a/src/unit.rs b/src/unit.rs index 0431009..2957af1 100644 --- a/src/unit.rs +++ b/src/unit.rs @@ -181,9 +181,6 @@ pub(crate) fn connect( // start reading the response to process cookies and redirects. let mut stream = stream::DeadlineStream::new(stream, unit.deadline); let mut resp = Response::from_read(&mut stream); - if resp.synthetic_error().is_some() { - return Err(resp.into_error().unwrap()); - } // https://tools.ietf.org/html/rfc7230#section-6.3.1 // When an inbound connection is closed prematurely, a client MAY @@ -201,6 +198,8 @@ pub(crate) fn connect( let empty = Payload::Empty.into_read(); return connect(req, unit, false, redirect_count, empty, redir); } + // Non-retryable errors return early. + return Err(resp.into_error().unwrap()); } // squirrel away cookies From 2bf9362eff0e77aac7f182effe13844e6171d3c2 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Wed, 21 Oct 2020 20:39:34 -0700 Subject: [PATCH 6/7] Reinstate read timeouts on body. This feature was broken in #67, which reset timeouts on the stream before passing it to set_stream. As part of this change, refactor the internal storage of timeouts on the Request object to use Option. Remove the deadline field on Response. It wasn't used. The deadline field on unit was used instead. Add a unittest. --- src/request.rs | 16 +++++++++++----- src/response.rs | 26 ++++++++++++++++++-------- src/stream.rs | 20 ++++++++++---------- src/test/timeout.rs | 18 +++++++++++++++++- 4 files changed, 56 insertions(+), 24 deletions(-) diff --git a/src/request.rs b/src/request.rs index daee345..7d469fb 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,8 +1,8 @@ -use std::fmt; use std::io::Read; use std::result::Result; use std::sync::{Arc, Mutex}; use std::time; +use std::{fmt, time::Duration}; use qstring::QString; use url::{form_urlencoded, Url}; @@ -40,8 +40,8 @@ pub struct Request { pub(crate) headers: Vec
, pub(crate) query: QString, pub(crate) timeout_connect: u64, - pub(crate) timeout_read: u64, - pub(crate) timeout_write: u64, + pub(crate) timeout_read: Option, + pub(crate) timeout_write: Option, pub(crate) timeout: Option, pub(crate) redirects: u32, pub(crate) proxy: Option, @@ -368,7 +368,10 @@ impl Request { /// println!("{:?}", r); /// ``` pub fn timeout_read(&mut self, millis: u64) -> &mut Request { - self.timeout_read = millis; + match millis { + 0 => self.timeout_read = None, + m => self.timeout_read = Some(Duration::from_millis(m)), + } self } @@ -385,7 +388,10 @@ impl Request { /// println!("{:?}", r); /// ``` pub fn timeout_write(&mut self, millis: u64) -> &mut Request { - self.timeout_write = millis; + match millis { + 0 => self.timeout_write = None, + m => self.timeout_write = Some(Duration::from_millis(m)), + } self } diff --git a/src/response.rs b/src/response.rs index a768535..75ba391 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,7 +1,6 @@ use std::fmt; use std::io::{self, Cursor, ErrorKind, Read}; use std::str::FromStr; -use std::time::Instant; use chunked_transfer::Decoder as ChunkDecoder; @@ -56,7 +55,6 @@ pub struct Response { headers: Vec
, unit: Option, stream: Option, - deadline: Option, } /// index into status_line where we split: HTTP/1.1 200 OK @@ -327,12 +325,17 @@ impl Response { let stream = self.stream.expect("No reader in response?!"); let unit = self.unit; + if let Some(unit) = &unit { + let result = stream.set_read_timeout(unit.req.timeout_read); + if let Err(e) = result { + return Box::new(ErrorReader(e)) as Box; + } + } let deadline = unit.as_ref().and_then(|u| u.deadline); let stream = DeadlineStream::new(stream, deadline); match (use_chunked, limit_bytes) { - (true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))) - as Box, + (true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))), (false, Some(len)) => { Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len))) } @@ -505,7 +508,6 @@ impl Response { headers, unit: None, stream: None, - deadline: None, }) } @@ -585,9 +587,6 @@ impl Into for Error { /// *Internal API* pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option, stream: Stream) { resp.url = Some(url); - if let Some(unit) = &unit { - resp.deadline = unit.deadline; - } resp.unit = unit; resp.stream = Some(stream); } @@ -813,3 +812,14 @@ mod tests { assert_eq!(v, "Bad Status\n"); } } + +// ErrorReader returns an error for every read. +// The error is as close to a clone of the underlying +// io::Error as we can get. +struct ErrorReader(io::Error); + +impl Read for ErrorReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + Err(io::Error::new(self.0.kind(), self.0.to_string())) + } +} diff --git a/src/stream.rs b/src/stream.rs index f14a3ae..cf53c4d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -172,6 +172,14 @@ impl Stream { } } + pub(crate) fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { + if let Some(socket) = self.socket() { + socket.set_read_timeout(timeout) + } else { + Ok(()) + } + } + #[cfg(test)] pub fn to_write_vec(&self) -> Vec { match self { @@ -453,24 +461,16 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result 0 { - stream - .set_read_timeout(Some(Duration::from_millis(unit.req.timeout_read as u64))) - .ok(); } else { - stream.set_read_timeout(None).ok(); + stream.set_read_timeout(unit.req.timeout_read)?; } if let Some(deadline) = deadline { stream .set_write_timeout(Some(time_until_deadline(deadline)?)) .ok(); - } else if unit.req.timeout_write > 0 { - stream - .set_write_timeout(Some(Duration::from_millis(unit.req.timeout_write as u64))) - .ok(); } else { - stream.set_write_timeout(None).ok(); + stream.set_read_timeout(unit.req.timeout_read)?; } if proto == Some(Proto::HTTPConnect) { diff --git a/src/test/timeout.rs b/src/test/timeout.rs index 7a296e4..c9a0821 100644 --- a/src/test/timeout.rs +++ b/src/test/timeout.rs @@ -19,7 +19,7 @@ fn dribble_body_respond(mut stream: TcpStream, contents: &[u8]) -> io::Result<() stream.write_all(&contents[i..i + 1])?; stream.write_all(&[b'\n'; 1])?; stream.flush()?; - thread::sleep(Duration::from_millis(10)); + thread::sleep(Duration::from_millis(100)); } Ok(()) } @@ -47,6 +47,22 @@ fn overall_timeout_during_body() { get_and_expect_timeout(url); } +#[test] +fn read_timeout_during_body() { + let server = TestServer::new(|stream| dribble_body_respond(stream, &[b'a'; 300])); + let url = format!("http://localhost:{}/", server.port); + let agent = Agent::default().build(); + let resp = agent.get(&url).timeout_read(5).call(); + match resp.into_string() { + Err(io_error) => match io_error.kind() { + io::ErrorKind::TimedOut => Ok(()), + _ => Err(format!("{:?}", io_error)), + }, + Ok(_) => Err("successful response".to_string()), + } + .expect("expected timeout but got something else"); +} + // Send HTTP headers on the TcpStream at a rate of one header every 100 // milliseconds, for a total of 30 headers. fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> { From 22e38393403570131221fc9fa8e1f20540130801 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Thu, 22 Oct 2020 00:10:30 -0700 Subject: [PATCH 7/7] Review feedback. --- src/stream.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/stream.rs b/src/stream.rs index cf53c4d..d057e53 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -455,20 +455,14 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result