Add debug logs for stream pooling.

This commit is contained in:
Jacob Hoffman-Andrews
2020-11-29 00:53:12 -08:00
committed by Martin Algesten
parent 37f991fa50
commit 35c03521b9
3 changed files with 42 additions and 25 deletions

View File

@@ -7,6 +7,7 @@ use crate::stream::Stream;
use crate::unit::Unit; use crate::unit::Unit;
use crate::Proxy; use crate::Proxy;
use log::debug;
use url::Url; use url::Url;
/// Holder of recycled connections. /// Holder of recycled connections.
@@ -115,6 +116,7 @@ impl ConnectionPool {
remove_last_match(&mut inner.lru, &key) remove_last_match(&mut inner.lru, &key)
.expect("invariant failed: key in recycle but not in lru"); .expect("invariant failed: key in recycle but not in lru");
debug!("pulling stream from pool: {:?} -> {:?}", key, stream);
Some(stream) Some(stream)
} }
Entry::Vacant(_) => None, Entry::Vacant(_) => None,
@@ -125,6 +127,7 @@ impl ConnectionPool {
if self.noop() { if self.noop() {
return; return;
} }
debug!("adding stream to pool: {:?} -> {:?}", key, stream);
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
match inner.recycle.entry(key.clone()) { match inner.recycle.entry(key.clone()) {
@@ -133,7 +136,13 @@ impl ConnectionPool {
streams.push_back(stream); streams.push_back(stream);
if streams.len() > self.max_idle_connections_per_host { if streams.len() > self.max_idle_connections_per_host {
// Remove the oldest entry // Remove the oldest entry
streams.pop_front(); let stream = streams.pop_front().expect("empty streams list");
debug!(
"host {:?} has {} conns, dropping oldest: {:?}",
key,
streams.len(),
stream
);
remove_first_match(&mut inner.lru, &key) remove_first_match(&mut inner.lru, &key)
.expect("invariant failed: key in recycle but not in lru"); .expect("invariant failed: key in recycle but not in lru");
} }
@@ -159,9 +168,10 @@ impl ConnectionPool {
match inner.recycle.entry(key) { match inner.recycle.entry(key) {
Entry::Occupied(mut occupied_entry) => { Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut(); let streams = occupied_entry.get_mut();
streams let stream = streams
.pop_front() .pop_front()
.expect("invariant failed: key existed in recycle but no streams available"); .expect("invariant failed: key existed in recycle but no streams available");
debug!("dropping oldest stream in pool: {:?}", stream);
if streams.len() == 0 { if streams.len() == 0 {
occupied_entry.remove(); occupied_entry.remove();
} }

View File

@@ -111,35 +111,38 @@ pub(crate) fn io_err_timeout(error: String) -> io::Error {
impl fmt::Debug for Stream { impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut result = f.debug_struct("Stream");
match self.inner.get_ref() { match self.inner.get_ref() {
Inner::Http(tcpstream) => result.field("tcp", tcpstream), Inner::Http(tcpstream) => write!(f, "{:?}", tcpstream),
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
Inner::Https(tlsstream) => result.field("tls", tlsstream.get_ref()), Inner::Https(tlsstream) => write!(f, "{:?}", tlsstream.get_ref()),
Inner::Test(_, _) => result.field("test", &String::new()), Inner::Test(_, _) => write!(f, "Stream(Test)"),
}; }
result.finish()
} }
} }
impl Stream { impl Stream {
fn logged_create(stream: Stream) -> Stream {
debug!("created stream: {:?}", stream);
stream
}
pub(crate) fn from_vec(v: Vec<u8>) -> Stream { pub(crate) fn from_vec(v: Vec<u8>) -> Stream {
Stream { Stream::logged_create(Stream {
inner: BufReader::new(Inner::Test(Box::new(Cursor::new(v)), vec![])), inner: BufReader::new(Inner::Test(Box::new(Cursor::new(v)), vec![])),
} })
} }
fn from_tcp_stream(t: TcpStream) -> Stream { fn from_tcp_stream(t: TcpStream) -> Stream {
Stream { Stream::logged_create(Stream {
inner: BufReader::new(Inner::Http(t)), inner: BufReader::new(Inner::Http(t)),
} })
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
fn from_tls_stream(t: StreamOwned<ClientSession, TcpStream>) -> Stream { fn from_tls_stream(t: StreamOwned<ClientSession, TcpStream>) -> Stream {
Stream { Stream::logged_create(Stream {
inner: BufReader::new(Inner::Https(t)), inner: BufReader::new(Inner::Https(t)),
} })
} }
// Check if the server has closed a stream by performing a one-byte // Check if the server has closed a stream by performing a one-byte
@@ -206,8 +209,8 @@ impl Stream {
} }
#[cfg(test)] #[cfg(test)]
pub fn to_write_vec(self) -> Vec<u8> { pub fn to_write_vec(&self) -> Vec<u8> {
match self.inner.into_inner() { match self.inner.get_ref() {
Inner::Test(_, writer) => writer.clone(), Inner::Test(_, writer) => writer.clone(),
_ => panic!("to_write_vec on non Test stream"), _ => panic!("to_write_vec on non Test stream"),
} }
@@ -298,6 +301,12 @@ impl Write for Stream {
} }
} }
impl Drop for Stream {
fn drop(&mut self) {
debug!("dropping stream: {:?}", self);
}
}
pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> { pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
// //
let port = unit.url.port().unwrap_or(80); let port = unit.url.port().unwrap_or(80);
@@ -388,7 +397,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
None => None, None => None,
}; };
debug!("connecting to {}", &sock_addr); debug!("connecting to {} at {}", netloc, &sock_addr);
// connect with a configured timeout. // connect with a configured timeout.
let stream = if Some(Proto::SOCKS5) == proto { let stream = if Some(Proto::SOCKS5) == proto {
connect_socks5( connect_socks5(

View File

@@ -188,7 +188,7 @@ pub(crate) fn connect(
if let Err(err) = send_result { if let Err(err) = send_result {
if is_recycled { if is_recycled {
debug!("retrying request early {} {}", method, url); debug!("retrying request early {} {}: {}", method, url, err);
// we try open a new connection, this time there will be // we try open a new connection, this time there will be
// no connection in the pool. don't use it. // no connection in the pool. don't use it.
return connect(unit, false, redirect_count, body, redir); return connect(unit, false, redirect_count, body, redir);
@@ -218,7 +218,7 @@ pub(crate) fn connect(
// up to N+1 total tries, where N is max_idle_connections_per_host. // up to N+1 total tries, where N is max_idle_connections_per_host.
let mut resp = match result { let mut resp = match result {
Err(err) if err.connection_closed() && retryable && is_recycled => { Err(err) if err.connection_closed() && retryable && is_recycled => {
debug!("retrying request {} {}", method, url); debug!("retrying request {} {}: {}", method, url, err);
let empty = Payload::Empty.into_read(); let empty = Payload::Empty.into_read();
return connect(unit, false, redirect_count, empty, redir); return connect(unit, false, redirect_count, empty, redir);
} }
@@ -312,19 +312,17 @@ fn connect_socket(unit: &Unit, hostname: &str, use_pooled: bool) -> Result<(Stre
scheme => return Err(ErrorKind::UnknownScheme.msg(&format!("unknown scheme '{}'", scheme))), scheme => return Err(ErrorKind::UnknownScheme.msg(&format!("unknown scheme '{}'", scheme))),
}; };
if use_pooled { if use_pooled {
let agent = &unit.agent; let pool = &unit.agent.state.pool;
let proxy = &unit.agent.config.proxy;
// The connection may have been closed by the server // The connection may have been closed by the server
// due to idle timeout while it was sitting in the pool. // due to idle timeout while it was sitting in the pool.
// Loop until we find one that is still good or run out of connections. // Loop until we find one that is still good or run out of connections.
while let Some(stream) = agent while let Some(stream) = pool.try_get_connection(&unit.url, proxy.clone()) {
.state
.pool
.try_get_connection(&unit.url, unit.agent.config.proxy.clone())
{
let server_closed = stream.server_closed()?; let server_closed = stream.server_closed()?;
if !server_closed { if !server_closed {
return Ok((stream, true)); return Ok((stream, true));
} }
debug!("dropping stream from pool; closed by server: {:?}", stream);
} }
} }
let stream = match unit.url.scheme() { let stream = match unit.url.scheme() {