From 0346794e8735ded6170428dd3aadc8c594910971 Mon Sep 17 00:00:00 2001 From: Martin Algesten Date: Mon, 28 Sep 2020 09:32:50 +0200 Subject: [PATCH] Fix bug in force-unwrapping when resetting timers When running tests locally, this error can surface. ``` ---- test::agent_test::custom_resolver stdout ---- thread 'test::agent_test::custom_resolver' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 22, kind: InvalidInput, message: "Invalid argument" }', src/stream.rs:60:13 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ``` The problem is that setting the timeouts might fail, and this is done in a From trait where there is not possibility to "bubble" the io::Error. ``` socket.set_read_timeout(None).unwrap(); socket.set_write_timeout(None).unwrap(); ``` This commit moves the resetting of timers to an explicit `Stream::reset()` fn that must be called every time we're unwrapping the inner stream. --- src/pool.rs | 14 ++++++++++---- src/stream.rs | 20 ++++++++++++-------- src/unit.rs | 5 ++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index c984135..711edab 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -379,20 +379,26 @@ impl> PoolReturnRead { } } - fn return_connection(&mut self) { + fn return_connection(&mut self) -> io::Result<()> { // guard we only do this once. if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) { let state = &mut unit.req.agent.lock().unwrap(); // bring back stream here to either go into pool or dealloc - let stream = reader.into(); + let mut stream = reader.into(); if !stream.is_poolable() { // just let it deallocate - return; + return Ok(()); } + + // ensure stream can be reused + stream.reset()?; + // insert back into pool let key = PoolKey::new(&unit.url, &unit.req.proxy); state.pool().add(key, stream); } + + Ok(()) } fn do_read(&mut self, buf: &mut [u8]) -> io::Result { @@ -409,7 +415,7 @@ impl> Read for PoolReturnRead { // only if the underlying reader is exhausted can we send a new // request to the same socket. hence, we only return it now. if amount == 0 { - self.return_connection(); + self.return_connection()?; } Ok(amount) } diff --git a/src/stream.rs b/src/stream.rs index 8b89c23..0697f93 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -53,14 +53,7 @@ impl DeadlineStream { impl From for Stream { fn from(deadline_stream: DeadlineStream) -> Stream { - // Since we are turning this back into a regular, non-deadline Stream, - // remove any timeouts we set. - let stream = deadline_stream.stream; - if let Some(socket) = stream.socket() { - socket.set_read_timeout(None).unwrap(); - socket.set_write_timeout(None).unwrap(); - } - stream + deadline_stream.stream } } @@ -159,6 +152,17 @@ impl Stream { } } + pub(crate) fn reset(&mut self) -> io::Result<()> { + // When we are turning this back into a regular, non-deadline Stream, + // remove any timeouts we set. + if let Some(socket) = self.socket() { + socket.set_read_timeout(None)?; + socket.set_write_timeout(None)?; + } + + Ok(()) + } + pub(crate) fn socket(&self) -> Option<&TcpStream> { match self { Stream::Http(b) => Some(b.get_ref()), diff --git a/src/unit.rs b/src/unit.rs index 4a28a14..94953d6 100644 --- a/src/unit.rs +++ b/src/unit.rs @@ -238,9 +238,12 @@ pub(crate) fn connect( debug!("response {} to {} {}", resp.status(), method, url); + let mut stream: Stream = stream.into(); + stream.reset()?; + // since it is not a redirect, or we're not following redirects, // give away the incoming stream to the response object. - crate::response::set_stream(&mut resp, url.to_string(), Some(unit), stream.into()); + crate::response::set_stream(&mut resp, unit.url.to_string(), Some(unit), stream); // release the response Ok(resp)