Reinstate read timeouts on body.

This feature was broken in #67, which reset timeouts on the
stream before passing it to set_stream.

As part of this change, refactor the internal storage of
timeouts on the Request object to use Option<Duration>.

Remove the deadline field on Response. It wasn't used. The
deadline field on unit was used instead.

Add a unittest.
This commit is contained in:
Jacob Hoffman-Andrews
2020-10-21 20:39:34 -07:00
committed by Martin Algesten
parent 32f9ebc04a
commit 2bf9362eff
4 changed files with 56 additions and 24 deletions

View File

@@ -1,8 +1,8 @@
use std::fmt;
use std::io::Read; use std::io::Read;
use std::result::Result; use std::result::Result;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time; use std::time;
use std::{fmt, time::Duration};
use qstring::QString; use qstring::QString;
use url::{form_urlencoded, Url}; use url::{form_urlencoded, Url};
@@ -40,8 +40,8 @@ pub struct Request {
pub(crate) headers: Vec<Header>, pub(crate) headers: Vec<Header>,
pub(crate) query: QString, pub(crate) query: QString,
pub(crate) timeout_connect: u64, pub(crate) timeout_connect: u64,
pub(crate) timeout_read: u64, pub(crate) timeout_read: Option<time::Duration>,
pub(crate) timeout_write: u64, pub(crate) timeout_write: Option<time::Duration>,
pub(crate) timeout: Option<time::Duration>, pub(crate) timeout: Option<time::Duration>,
pub(crate) redirects: u32, pub(crate) redirects: u32,
pub(crate) proxy: Option<Proxy>, pub(crate) proxy: Option<Proxy>,
@@ -368,7 +368,10 @@ impl Request {
/// println!("{:?}", r); /// println!("{:?}", r);
/// ``` /// ```
pub fn timeout_read(&mut self, millis: u64) -> &mut Request { pub fn timeout_read(&mut self, millis: u64) -> &mut Request {
self.timeout_read = millis; match millis {
0 => self.timeout_read = None,
m => self.timeout_read = Some(Duration::from_millis(m)),
}
self self
} }
@@ -385,7 +388,10 @@ impl Request {
/// println!("{:?}", r); /// println!("{:?}", r);
/// ``` /// ```
pub fn timeout_write(&mut self, millis: u64) -> &mut Request { pub fn timeout_write(&mut self, millis: u64) -> &mut Request {
self.timeout_write = millis; match millis {
0 => self.timeout_write = None,
m => self.timeout_write = Some(Duration::from_millis(m)),
}
self self
} }

View File

@@ -1,7 +1,6 @@
use std::fmt; use std::fmt;
use std::io::{self, Cursor, ErrorKind, Read}; use std::io::{self, Cursor, ErrorKind, Read};
use std::str::FromStr; use std::str::FromStr;
use std::time::Instant;
use chunked_transfer::Decoder as ChunkDecoder; use chunked_transfer::Decoder as ChunkDecoder;
@@ -56,7 +55,6 @@ pub struct Response {
headers: Vec<Header>, headers: Vec<Header>,
unit: Option<Unit>, unit: Option<Unit>,
stream: Option<Stream>, stream: Option<Stream>,
deadline: Option<Instant>,
} }
/// index into status_line where we split: HTTP/1.1 200 OK /// index into status_line where we split: HTTP/1.1 200 OK
@@ -327,12 +325,17 @@ impl Response {
let stream = self.stream.expect("No reader in response?!"); let stream = self.stream.expect("No reader in response?!");
let unit = self.unit; let unit = self.unit;
if let Some(unit) = &unit {
let result = stream.set_read_timeout(unit.req.timeout_read);
if let Err(e) = result {
return Box::new(ErrorReader(e)) as Box<dyn Read + Send>;
}
}
let deadline = unit.as_ref().and_then(|u| u.deadline); let deadline = unit.as_ref().and_then(|u| u.deadline);
let stream = DeadlineStream::new(stream, deadline); let stream = DeadlineStream::new(stream, deadline);
match (use_chunked, limit_bytes) { match (use_chunked, limit_bytes) {
(true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))) (true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))),
as Box<dyn Read + Send>,
(false, Some(len)) => { (false, Some(len)) => {
Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len))) Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len)))
} }
@@ -505,7 +508,6 @@ impl Response {
headers, headers,
unit: None, unit: None,
stream: None, stream: None,
deadline: None,
}) })
} }
@@ -585,9 +587,6 @@ impl Into<Response> for Error {
/// *Internal API* /// *Internal API*
pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option<Unit>, stream: Stream) { pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option<Unit>, stream: Stream) {
resp.url = Some(url); resp.url = Some(url);
if let Some(unit) = &unit {
resp.deadline = unit.deadline;
}
resp.unit = unit; resp.unit = unit;
resp.stream = Some(stream); resp.stream = Some(stream);
} }
@@ -813,3 +812,14 @@ mod tests {
assert_eq!(v, "Bad Status\n"); assert_eq!(v, "Bad Status\n");
} }
} }
// ErrorReader returns an error for every read.
// The error is as close to a clone of the underlying
// io::Error as we can get.
struct ErrorReader(io::Error);
impl Read for ErrorReader {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
Err(io::Error::new(self.0.kind(), self.0.to_string()))
}
}

View File

@@ -172,6 +172,14 @@ impl Stream {
} }
} }
pub(crate) fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
if let Some(socket) = self.socket() {
socket.set_read_timeout(timeout)
} else {
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
pub fn to_write_vec(&self) -> Vec<u8> { pub fn to_write_vec(&self) -> Vec<u8> {
match self { match self {
@@ -453,24 +461,16 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
stream stream
.set_read_timeout(Some(time_until_deadline(deadline)?)) .set_read_timeout(Some(time_until_deadline(deadline)?))
.ok(); .ok();
} else if unit.req.timeout_read > 0 {
stream
.set_read_timeout(Some(Duration::from_millis(unit.req.timeout_read as u64)))
.ok();
} else { } else {
stream.set_read_timeout(None).ok(); stream.set_read_timeout(unit.req.timeout_read)?;
} }
if let Some(deadline) = deadline { if let Some(deadline) = deadline {
stream stream
.set_write_timeout(Some(time_until_deadline(deadline)?)) .set_write_timeout(Some(time_until_deadline(deadline)?))
.ok(); .ok();
} else if unit.req.timeout_write > 0 {
stream
.set_write_timeout(Some(Duration::from_millis(unit.req.timeout_write as u64)))
.ok();
} else { } else {
stream.set_write_timeout(None).ok(); stream.set_read_timeout(unit.req.timeout_read)?;
} }
if proto == Some(Proto::HTTPConnect) { if proto == Some(Proto::HTTPConnect) {

View File

@@ -19,7 +19,7 @@ fn dribble_body_respond(mut stream: TcpStream, contents: &[u8]) -> io::Result<()
stream.write_all(&contents[i..i + 1])?; stream.write_all(&contents[i..i + 1])?;
stream.write_all(&[b'\n'; 1])?; stream.write_all(&[b'\n'; 1])?;
stream.flush()?; stream.flush()?;
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(100));
} }
Ok(()) Ok(())
} }
@@ -47,6 +47,22 @@ fn overall_timeout_during_body() {
get_and_expect_timeout(url); get_and_expect_timeout(url);
} }
#[test]
fn read_timeout_during_body() {
let server = TestServer::new(|stream| dribble_body_respond(stream, &[b'a'; 300]));
let url = format!("http://localhost:{}/", server.port);
let agent = Agent::default().build();
let resp = agent.get(&url).timeout_read(5).call();
match resp.into_string() {
Err(io_error) => match io_error.kind() {
io::ErrorKind::TimedOut => Ok(()),
_ => Err(format!("{:?}", io_error)),
},
Ok(_) => Err("successful response".to_string()),
}
.expect("expected timeout but got something else");
}
// Send HTTP headers on the TcpStream at a rate of one header every 100 // Send HTTP headers on the TcpStream at a rate of one header every 100
// milliseconds, for a total of 30 headers. // milliseconds, for a total of 30 headers.
fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> { fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> {