diff --git a/src/pool.rs b/src/pool.rs index f6a154e..f1311ce 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -7,6 +7,7 @@ use crate::stream::Stream; use crate::unit::Unit; use crate::Proxy; +use log::debug; use url::Url; /// Holder of recycled connections. @@ -115,6 +116,7 @@ impl ConnectionPool { remove_last_match(&mut inner.lru, &key) .expect("invariant failed: key in recycle but not in lru"); + debug!("pulling stream from pool: {:?} -> {:?}", key, stream); Some(stream) } Entry::Vacant(_) => None, @@ -125,6 +127,7 @@ impl ConnectionPool { if self.noop() { return; } + debug!("adding stream to pool: {:?} -> {:?}", key, stream); let mut inner = self.inner.lock().unwrap(); match inner.recycle.entry(key.clone()) { @@ -133,7 +136,13 @@ impl ConnectionPool { streams.push_back(stream); if streams.len() > self.max_idle_connections_per_host { // Remove the oldest entry - streams.pop_front(); + let stream = streams.pop_front().expect("empty streams list"); + debug!( + "host {:?} has {} conns, dropping oldest: {:?}", + key, + streams.len(), + stream + ); remove_first_match(&mut inner.lru, &key) .expect("invariant failed: key in recycle but not in lru"); } @@ -159,9 +168,10 @@ impl ConnectionPool { match inner.recycle.entry(key) { Entry::Occupied(mut occupied_entry) => { let streams = occupied_entry.get_mut(); - streams + let stream = streams .pop_front() .expect("invariant failed: key existed in recycle but no streams available"); + debug!("dropping oldest stream in pool: {:?}", stream); if streams.len() == 0 { occupied_entry.remove(); } diff --git a/src/stream.rs b/src/stream.rs index bb3419a..149771e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -111,35 +111,38 @@ pub(crate) fn io_err_timeout(error: String) -> io::Error { impl fmt::Debug for Stream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut result = f.debug_struct("Stream"); match self.inner.get_ref() { - Inner::Http(tcpstream) => result.field("tcp", tcpstream), + Inner::Http(tcpstream) => write!(f, "{:?}", tcpstream), #[cfg(feature = "tls")] - Inner::Https(tlsstream) => result.field("tls", tlsstream.get_ref()), - Inner::Test(_, _) => result.field("test", &String::new()), - }; - result.finish() + Inner::Https(tlsstream) => write!(f, "{:?}", tlsstream.get_ref()), + Inner::Test(_, _) => write!(f, "Stream(Test)"), + } } } impl Stream { + fn logged_create(stream: Stream) -> Stream { + debug!("created stream: {:?}", stream); + stream + } + pub(crate) fn from_vec(v: Vec) -> Stream { - Stream { + Stream::logged_create(Stream { inner: BufReader::new(Inner::Test(Box::new(Cursor::new(v)), vec![])), - } + }) } fn from_tcp_stream(t: TcpStream) -> Stream { - Stream { + Stream::logged_create(Stream { inner: BufReader::new(Inner::Http(t)), - } + }) } #[cfg(feature = "tls")] fn from_tls_stream(t: StreamOwned) -> Stream { - Stream { + Stream::logged_create(Stream { inner: BufReader::new(Inner::Https(t)), - } + }) } // Check if the server has closed a stream by performing a one-byte @@ -206,8 +209,8 @@ impl Stream { } #[cfg(test)] - pub fn to_write_vec(self) -> Vec { - match self.inner.into_inner() { + pub fn to_write_vec(&self) -> Vec { + match self.inner.get_ref() { Inner::Test(_, writer) => writer.clone(), _ => panic!("to_write_vec on non Test stream"), } @@ -298,6 +301,12 @@ impl Write for Stream { } } +impl Drop for Stream { + fn drop(&mut self) { + debug!("dropping stream: {:?}", self); + } +} + pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result { // let port = unit.url.port().unwrap_or(80); @@ -388,7 +397,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result None, }; - debug!("connecting to {}", &sock_addr); + debug!("connecting to {} at {}", netloc, &sock_addr); // connect with a configured timeout. let stream = if Some(Proto::SOCKS5) == proto { connect_socks5( diff --git a/src/unit.rs b/src/unit.rs index 4f794de..934b6db 100644 --- a/src/unit.rs +++ b/src/unit.rs @@ -188,7 +188,7 @@ pub(crate) fn connect( if let Err(err) = send_result { if is_recycled { - debug!("retrying request early {} {}", method, url); + debug!("retrying request early {} {}: {}", method, url, err); // we try open a new connection, this time there will be // no connection in the pool. don't use it. return connect(unit, false, redirect_count, body, redir); @@ -218,7 +218,7 @@ pub(crate) fn connect( // up to N+1 total tries, where N is max_idle_connections_per_host. let mut resp = match result { Err(err) if err.connection_closed() && retryable && is_recycled => { - debug!("retrying request {} {}", method, url); + debug!("retrying request {} {}: {}", method, url, err); let empty = Payload::Empty.into_read(); return connect(unit, false, redirect_count, empty, redir); } @@ -312,19 +312,17 @@ fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stre scheme => return Err(ErrorKind::UnknownScheme.msg(&format!("unknown scheme '{}'", scheme))), }; if use_pooled { - let agent = &unit.agent; + let pool = &unit.agent.state.pool; + let proxy = &unit.agent.config.proxy; // The connection may have been closed by the server // due to idle timeout while it was sitting in the pool. // Loop until we find one that is still good or run out of connections. - while let Some(stream) = agent - .state - .pool - .try_get_connection(&unit.url, unit.agent.config.proxy.clone()) - { + while let Some(stream) = pool.try_get_connection(&unit.url, proxy.clone()) { let server_closed = stream.server_closed()?; if !server_closed { return Ok((stream, true)); } + debug!("dropping stream from pool; closed by server: {:?}", stream); } } let stream = match unit.url.scheme() {