Fix bug in force-unwrapping when resetting timers

When running tests locally, this error can surface.

```
---- test::agent_test::custom_resolver stdout ----
thread 'test::agent_test::custom_resolver' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 22, kind: InvalidInput, message: "Invalid argument" }', src/stream.rs:60:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
```

The problem is that setting the timeouts might fail, and this is done
in a From trait where there is not possibility to "bubble" the
io::Error.

```
socket.set_read_timeout(None).unwrap();
socket.set_write_timeout(None).unwrap();
```

This commit moves the resetting of timers to an explicit `Stream::reset()` fn
that must be called every time we're unwrapping the inner stream.
This commit is contained in:
Martin Algesten
2020-09-28 09:32:50 +02:00
parent 6f70349172
commit 0346794e87
3 changed files with 26 additions and 13 deletions

View File

@@ -379,20 +379,26 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
} }
} }
fn return_connection(&mut self) { fn return_connection(&mut self) -> io::Result<()> {
// guard we only do this once. // guard we only do this once.
if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) { if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) {
let state = &mut unit.req.agent.lock().unwrap(); let state = &mut unit.req.agent.lock().unwrap();
// bring back stream here to either go into pool or dealloc // bring back stream here to either go into pool or dealloc
let stream = reader.into(); let mut stream = reader.into();
if !stream.is_poolable() { if !stream.is_poolable() {
// just let it deallocate // just let it deallocate
return; return Ok(());
} }
// ensure stream can be reused
stream.reset()?;
// insert back into pool // insert back into pool
let key = PoolKey::new(&unit.url, &unit.req.proxy); let key = PoolKey::new(&unit.url, &unit.req.proxy);
state.pool().add(key, stream); state.pool().add(key, stream);
} }
Ok(())
} }
fn do_read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn do_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
@@ -409,7 +415,7 @@ impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
// only if the underlying reader is exhausted can we send a new // only if the underlying reader is exhausted can we send a new
// request to the same socket. hence, we only return it now. // request to the same socket. hence, we only return it now.
if amount == 0 { if amount == 0 {
self.return_connection(); self.return_connection()?;
} }
Ok(amount) Ok(amount)
} }

View File

@@ -53,14 +53,7 @@ impl DeadlineStream {
impl From<DeadlineStream> for Stream { impl From<DeadlineStream> for Stream {
fn from(deadline_stream: DeadlineStream) -> Stream { fn from(deadline_stream: DeadlineStream) -> Stream {
// Since we are turning this back into a regular, non-deadline Stream, deadline_stream.stream
// remove any timeouts we set.
let stream = deadline_stream.stream;
if let Some(socket) = stream.socket() {
socket.set_read_timeout(None).unwrap();
socket.set_write_timeout(None).unwrap();
}
stream
} }
} }
@@ -159,6 +152,17 @@ impl Stream {
} }
} }
pub(crate) fn reset(&mut self) -> io::Result<()> {
// When we are turning this back into a regular, non-deadline Stream,
// remove any timeouts we set.
if let Some(socket) = self.socket() {
socket.set_read_timeout(None)?;
socket.set_write_timeout(None)?;
}
Ok(())
}
pub(crate) fn socket(&self) -> Option<&TcpStream> { pub(crate) fn socket(&self) -> Option<&TcpStream> {
match self { match self {
Stream::Http(b) => Some(b.get_ref()), Stream::Http(b) => Some(b.get_ref()),

View File

@@ -238,9 +238,12 @@ pub(crate) fn connect(
debug!("response {} to {} {}", resp.status(), method, url); debug!("response {} to {} {}", resp.status(), method, url);
let mut stream: Stream = stream.into();
stream.reset()?;
// since it is not a redirect, or we're not following redirects, // since it is not a redirect, or we're not following redirects,
// give away the incoming stream to the response object. // give away the incoming stream to the response object.
crate::response::set_stream(&mut resp, url.to_string(), Some(unit), stream.into()); crate::response::set_stream(&mut resp, unit.url.to_string(), Some(unit), stream);
// release the response // release the response
Ok(resp) Ok(resp)