Merge branch 'master' into release-2.0
This commit is contained in:
@@ -46,7 +46,7 @@ serde = { version = "1", features = ["derive"] }
|
|||||||
rayon = "1.3.0"
|
rayon = "1.3.0"
|
||||||
rayon-core = "1.7.0"
|
rayon-core = "1.7.0"
|
||||||
chrono = "0.4.11"
|
chrono = "0.4.11"
|
||||||
env_logger = "0.7.1"
|
env_logger = "0.8.1"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "smoke-test"
|
name = "smoke-test"
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, Cursor, ErrorKind, Read};
|
use std::io::{self, Cursor, ErrorKind, Read};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
use chunked_transfer::Decoder as ChunkDecoder;
|
use chunked_transfer::Decoder as ChunkDecoder;
|
||||||
|
|
||||||
@@ -47,7 +46,6 @@ pub struct Response {
|
|||||||
headers: Vec<Header>,
|
headers: Vec<Header>,
|
||||||
unit: Option<Unit>,
|
unit: Option<Unit>,
|
||||||
stream: Option<Stream>,
|
stream: Option<Stream>,
|
||||||
deadline: Option<Instant>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// index into status_line where we split: HTTP/1.1 200 OK
|
/// index into status_line where we split: HTTP/1.1 200 OK
|
||||||
@@ -267,12 +265,17 @@ impl Response {
|
|||||||
|
|
||||||
let stream = self.stream.expect("No reader in response?!");
|
let stream = self.stream.expect("No reader in response?!");
|
||||||
let unit = self.unit;
|
let unit = self.unit;
|
||||||
|
if let Some(unit) = &unit {
|
||||||
|
let result = stream.set_read_timeout(unit.req.agent.config.timeout_read);
|
||||||
|
if let Err(e) = result {
|
||||||
|
return Box::new(ErrorReader(e)) as Box<dyn Read + Send>;
|
||||||
|
}
|
||||||
|
}
|
||||||
let deadline = unit.as_ref().and_then(|u| u.deadline);
|
let deadline = unit.as_ref().and_then(|u| u.deadline);
|
||||||
let stream = DeadlineStream::new(stream, deadline);
|
let stream = DeadlineStream::new(stream, deadline);
|
||||||
|
|
||||||
match (use_chunked, limit_bytes) {
|
match (use_chunked, limit_bytes) {
|
||||||
(true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream)))
|
(true, _) => Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(stream))),
|
||||||
as Box<dyn Read + Send>,
|
|
||||||
(false, Some(len)) => {
|
(false, Some(len)) => {
|
||||||
Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len)))
|
Box::new(PoolReturnRead::new(unit, LimitedRead::new(stream, len)))
|
||||||
}
|
}
|
||||||
@@ -438,7 +441,6 @@ impl Response {
|
|||||||
headers,
|
headers,
|
||||||
unit: None,
|
unit: None,
|
||||||
stream: None,
|
stream: None,
|
||||||
deadline: None,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -507,9 +509,6 @@ impl FromStr for Response {
|
|||||||
/// *Internal API*
|
/// *Internal API*
|
||||||
pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option<Unit>, stream: Stream) {
|
pub(crate) fn set_stream(resp: &mut Response, url: String, unit: Option<Unit>, stream: Stream) {
|
||||||
resp.url = Some(url);
|
resp.url = Some(url);
|
||||||
if let Some(unit) = &unit {
|
|
||||||
resp.deadline = unit.deadline;
|
|
||||||
}
|
|
||||||
resp.unit = unit;
|
resp.unit = unit;
|
||||||
resp.stream = Some(stream);
|
resp.stream = Some(stream);
|
||||||
}
|
}
|
||||||
@@ -730,3 +729,14 @@ mod tests {
|
|||||||
assert!(matches!(err, Error::BadStatus));
|
assert!(matches!(err, Error::BadStatus));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrorReader returns an error for every read.
|
||||||
|
// The error is as close to a clone of the underlying
|
||||||
|
// io::Error as we can get.
|
||||||
|
struct ErrorReader(io::Error);
|
||||||
|
|
||||||
|
impl Read for ErrorReader {
|
||||||
|
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
Err(io::Error::new(self.0.kind(), self.0.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -162,6 +162,14 @@ impl Stream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
|
||||||
|
if let Some(socket) = self.socket() {
|
||||||
|
socket.set_read_timeout(timeout)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn to_write_vec(&self) -> Vec<u8> {
|
pub fn to_write_vec(&self) -> Vec<u8> {
|
||||||
match self {
|
match self {
|
||||||
@@ -324,7 +332,7 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
|
pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
|
||||||
let deadline: Option<Instant> =
|
let connect_deadline: Option<Instant> =
|
||||||
if let Some(timeout_connect) = unit.req.agent.config.timeout_connect {
|
if let Some(timeout_connect) = unit.req.agent.config.timeout_connect {
|
||||||
Instant::now().checked_add(timeout_connect)
|
Instant::now().checked_add(timeout_connect)
|
||||||
} else {
|
} else {
|
||||||
@@ -357,7 +365,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
// Find the first sock_addr that accepts a connection
|
// Find the first sock_addr that accepts a connection
|
||||||
for sock_addr in sock_addrs {
|
for sock_addr in sock_addrs {
|
||||||
// ensure connect timeout or overall timeout aren't yet hit.
|
// ensure connect timeout or overall timeout aren't yet hit.
|
||||||
let timeout = match deadline {
|
let timeout = match connect_deadline {
|
||||||
Some(deadline) => Some(time_until_deadline(deadline)?),
|
Some(deadline) => Some(time_until_deadline(deadline)?),
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
@@ -368,7 +376,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
connect_socks5(
|
connect_socks5(
|
||||||
&unit,
|
&unit,
|
||||||
proxy.clone().unwrap(),
|
proxy.clone().unwrap(),
|
||||||
deadline,
|
connect_deadline,
|
||||||
sock_addr,
|
sock_addr,
|
||||||
hostname,
|
hostname,
|
||||||
port,
|
port,
|
||||||
@@ -394,20 +402,16 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
return Err(err);
|
return Err(err);
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(deadline) = deadline {
|
if let Some(deadline) = unit.deadline {
|
||||||
stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
|
stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
|
||||||
} else if let Some(timeout_read) = unit.req.agent.config.timeout_read {
|
|
||||||
stream.set_read_timeout(Some(timeout_read))?;
|
|
||||||
} else {
|
} else {
|
||||||
stream.set_read_timeout(None)?;
|
stream.set_read_timeout(unit.req.agent.config.timeout_read)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(deadline) = deadline {
|
if let Some(deadline) = unit.deadline {
|
||||||
stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
|
stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
|
||||||
} else if let Some(timeout_write) = unit.req.agent.config.timeout_write {
|
|
||||||
stream.set_write_timeout(Some(timeout_write)).ok();
|
|
||||||
} else {
|
} else {
|
||||||
stream.set_write_timeout(None)?;
|
stream.set_write_timeout(unit.req.agent.config.timeout_write)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if proto == Some(Proto::HTTPConnect) {
|
if proto == Some(Proto::HTTPConnect) {
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ fn dribble_body_respond(mut stream: TcpStream, contents: &[u8]) -> io::Result<()
|
|||||||
stream.write_all(&contents[i..i + 1])?;
|
stream.write_all(&contents[i..i + 1])?;
|
||||||
stream.write_all(&[b'\n'; 1])?;
|
stream.write_all(&[b'\n'; 1])?;
|
||||||
stream.flush()?;
|
stream.flush()?;
|
||||||
thread::sleep(Duration::from_millis(10));
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -47,17 +47,38 @@ fn overall_timeout_during_body() {
|
|||||||
get_and_expect_timeout(url);
|
get_and_expect_timeout(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_timeout_during_body() {
|
||||||
|
let server = TestServer::new(|stream| dribble_body_respond(stream, &[b'a'; 300]));
|
||||||
|
let url = format!("http://localhost:{}/", server.port);
|
||||||
|
let agent = builder().timeout_read(Duration::from_millis(70)).build();
|
||||||
|
let resp = match agent.get(&url).call() {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => panic!("got error during headers, not body: {:?}", e),
|
||||||
|
};
|
||||||
|
match resp.into_string() {
|
||||||
|
Err(io_error) => match io_error.kind() {
|
||||||
|
io::ErrorKind::TimedOut => Ok(()),
|
||||||
|
_ => Err(format!("{:?}", io_error)),
|
||||||
|
},
|
||||||
|
Ok(_) => Err("successful response".to_string()),
|
||||||
|
}
|
||||||
|
.expect("expected timeout but got something else");
|
||||||
|
}
|
||||||
|
|
||||||
// Send HTTP headers on the TcpStream at a rate of one header every 100
|
// Send HTTP headers on the TcpStream at a rate of one header every 100
|
||||||
// milliseconds, for a total of 30 headers.
|
// milliseconds, for a total of 30 headers.
|
||||||
//fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> {
|
fn dribble_headers_respond(mut stream: TcpStream) -> io::Result<()> {
|
||||||
// stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n")?;
|
stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n")?;
|
||||||
// for _ in 0..30 {
|
for _ in 0..30 {
|
||||||
// stream.write_all(b"a: b\n")?;
|
stream.write_all(b"a: b\r\n")?;
|
||||||
// stream.flush()?;
|
stream.flush()?;
|
||||||
// thread::sleep(Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
// }
|
}
|
||||||
// Ok(())
|
stream.write_all(b"\r\n")?;
|
||||||
//}
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
// TODO: Our current behavior is actually incorrect (we'll return BadHeader if a timeout occurs during headers).
|
// TODO: Our current behavior is actually incorrect (we'll return BadHeader if a timeout occurs during headers).
|
||||||
@@ -70,6 +91,35 @@ fn overall_timeout_during_body() {
|
|||||||
// let url = format!("http://localhost:{}/", server.port);
|
// let url = format!("http://localhost:{}/", server.port);
|
||||||
// get_and_expect_timeout(url);
|
// get_and_expect_timeout(url);
|
||||||
//}
|
//}
|
||||||
|
#[test]
|
||||||
|
fn read_timeout_during_headers() {
|
||||||
|
let server = TestServer::new(dribble_headers_respond);
|
||||||
|
let url = format!("http://localhost:{}/", server.port);
|
||||||
|
let agent = builder().timeout_read(Duration::from_millis(10)).build();
|
||||||
|
let resp = agent.get(&url).call();
|
||||||
|
match resp {
|
||||||
|
Ok(_) => Err("successful response".to_string()),
|
||||||
|
Err(Error::Io(e)) if e.kind() == io::ErrorKind::TimedOut => Ok(()),
|
||||||
|
Err(e) => Err(format!("Unexpected error type: {:?}", e)),
|
||||||
|
}
|
||||||
|
.expect("expected timeout but got something else");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn overall_timeout_during_headers() {
|
||||||
|
// Start a test server on an available port, that dribbles out a response at 1 write per 10ms.
|
||||||
|
let server = TestServer::new(dribble_headers_respond);
|
||||||
|
let url = format!("http://localhost:{}/", server.port);
|
||||||
|
let agent = builder().timeout(Duration::from_millis(500)).build();
|
||||||
|
let resp = agent.get(&url).call();
|
||||||
|
match resp {
|
||||||
|
Ok(_) => Err("successful response".to_string()),
|
||||||
|
Err(Error::Io(e)) if e.kind() == io::ErrorKind::TimedOut => Ok(()),
|
||||||
|
Err(e) => Err(format!("Unexpected error type: {:?}", e)),
|
||||||
|
}
|
||||||
|
.expect("expected timeout but got something else");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[cfg(feature = "json")]
|
#[cfg(feature = "json")]
|
||||||
fn overall_timeout_reading_json() {
|
fn overall_timeout_reading_json() {
|
||||||
|
|||||||
Reference in New Issue
Block a user