Allow non-empty connection pools to be dropped (#583)
This breaks a reference cycle between PoolReturner and Agent that was causing Agents (and their contained ConnectionPool) to never be dropped so long as there was any stream in the ConnectionPool. This cause sockets to leak over time, particularly when the convenience functions ureq::get(), ureq::post(), etc were used, since those functions create a new Agent each time.
This commit is contained in:
committed by
GitHub
parent
33fb553a28
commit
498e19943f
@@ -238,6 +238,10 @@ impl Agent {
|
||||
pub fn cookie_store(&self) -> CookieStoreGuard<'_> {
|
||||
self.state.cookie_tin.read_lock()
|
||||
}
|
||||
|
||||
pub(crate) fn weak_state(&self) -> std::sync::Weak<AgentState> {
|
||||
Arc::downgrade(&self.state)
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_MAX_IDLE_CONNECTIONS: usize = 100;
|
||||
|
||||
20
src/pool.rs
20
src/pool.rs
@@ -3,6 +3,7 @@ use std::collections::{HashMap, VecDeque};
|
||||
use std::io::{self, Read};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::agent::AgentState;
|
||||
use crate::stream::Stream;
|
||||
use crate::{Agent, Proxy};
|
||||
|
||||
@@ -229,14 +230,17 @@ impl PoolKey {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct PoolReturner {
|
||||
inner: Option<(Agent, PoolKey)>,
|
||||
// We store a weak reference to an agent state here to avoid creating
|
||||
// a reference loop, since AgentState contains a ConnectionPool, which
|
||||
// contains Streams, which contain PoolReturners.
|
||||
inner: Option<(std::sync::Weak<AgentState>, PoolKey)>,
|
||||
}
|
||||
|
||||
impl PoolReturner {
|
||||
/// A PoolReturner that returns to the given Agent's Pool.
|
||||
pub(crate) fn new(agent: Agent, pool_key: PoolKey) -> Self {
|
||||
pub(crate) fn new(agent: &Agent, pool_key: PoolKey) -> Self {
|
||||
Self {
|
||||
inner: Some((agent, pool_key)),
|
||||
inner: Some((agent.weak_state(), pool_key)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,8 +250,10 @@ impl PoolReturner {
|
||||
}
|
||||
|
||||
pub(crate) fn return_to_pool(&self, stream: Stream) {
|
||||
if let Some((agent, pool_key)) = &self.inner {
|
||||
agent.state.pool.add(pool_key, stream);
|
||||
if let Some((weak_state, pool_key)) = &self.inner {
|
||||
if let Some(state) = weak_state.upgrade() {
|
||||
state.pool.add(pool_key, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -433,7 +439,7 @@ mod tests {
|
||||
|
||||
let agent = Agent::new();
|
||||
let pool_key = PoolKey::new(&url, None);
|
||||
let stream = NoopStream::stream(PoolReturner::new(agent.clone(), pool_key));
|
||||
let stream = NoopStream::stream(PoolReturner::new(&agent, pool_key));
|
||||
let mut limited_read = LimitedRead::new(stream, std::num::NonZeroUsize::new(500).unwrap());
|
||||
|
||||
limited_read.read_exact(&mut out_buf).unwrap();
|
||||
@@ -472,7 +478,7 @@ mod tests {
|
||||
let stream = Stream::new(
|
||||
ro,
|
||||
"1.1.1.1:4343".parse().unwrap(),
|
||||
PoolReturner::new(agent.clone(), PoolKey::from_parts("http", "1.1.1.1", 8080)),
|
||||
PoolReturner::new(&agent, PoolKey::from_parts("http", "1.1.1.1", 8080)),
|
||||
);
|
||||
|
||||
let chunked = crate::chunked::Decoder::new(stream);
|
||||
|
||||
@@ -1161,10 +1161,7 @@ mod tests {
|
||||
let stream = Stream::new(
|
||||
test_stream,
|
||||
"1.1.1.1:4343".parse().unwrap(),
|
||||
PoolReturner::new(
|
||||
agent.clone(),
|
||||
PoolKey::from_parts("https", "example.com", 443),
|
||||
),
|
||||
PoolReturner::new(&agent, PoolKey::from_parts("https", "example.com", 443)),
|
||||
);
|
||||
Response::do_from_stream(
|
||||
stream,
|
||||
|
||||
@@ -316,7 +316,7 @@ pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error>
|
||||
//
|
||||
let port = unit.url.port().unwrap_or(80);
|
||||
let pool_key = PoolKey::from_parts("http", hostname, port);
|
||||
let pool_returner = PoolReturner::new(unit.agent.clone(), pool_key);
|
||||
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
|
||||
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r, pool_returner))
|
||||
}
|
||||
|
||||
@@ -328,7 +328,7 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error
|
||||
let tls_conf = &unit.agent.config.tls_config;
|
||||
let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
|
||||
let pool_key = PoolKey::from_parts("https", hostname, port);
|
||||
let pool_returner = PoolReturner::new(unit.agent.clone(), pool_key);
|
||||
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
|
||||
Ok(Stream::new(https_stream, remote_addr, pool_returner))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user