Push mutexes down into pool and cookie store. (#193)

Previously, Agent stored most of its state in one big
Arc<Mutex<AgentState>>. This separates the Arc from the Mutexes.
Now, Agent is a thin wrapper around an Arc<AgentState>. The individual
components that need locking, ConnectionPool and CookieStore, now are
responsible for their own locking.

There were a couple of reasons for this. Internal components that needed
an Agent were often instead carrying around an Arc<Mutex<AgentState>>.
This felt like the components were too intertwined: those other
components shouldn't have to care quite so much about how Agent is
implemented. Also, this led to compromises of convenience: the Proxy on
Agent wound up stored inside the `Arc<Mutex<AgentState>>` even though it
didn't need locking. It was more convenient that way because that was
what Request and Unit had access too.

The other reason to push things down like this is that it can reduce
lock contention. Mutations to the cookie store don't need to lock the
connection pool, and vice versa. This was a secondary concern, since I
haven't actually profiled these things and found them to be a problem,
but it's a happy result of the refactoring.

Now all the components outside of Agent take an Agent instead of
AgentState.

In the process I removed `Agent.cookie()`. Its API was hard to use
correctly, since it didn't distinguish between cookies on different
hosts. And it would have required updates as part of this refactoring.
I'm open to reinstating some similar functionality with a refreshed API.

I kept `Agent.set_cookie`, but updated its method signature to take a
URL as well as a cookie.

Many of ConnectionPool's methods went from `&mut self` to `&self`,
because ConnectionPool is now using interior mutability.
This commit is contained in:
Jacob Hoffman-Andrews
2020-10-20 00:03:45 -07:00
committed by GitHub
parent 75bc803cf1
commit 703ca41960
7 changed files with 136 additions and 120 deletions

View File

@@ -1,6 +1,7 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::io::{self, Read};
use std::sync::Mutex;
use crate::stream::Stream;
use crate::unit::Unit;
@@ -34,8 +35,13 @@ const DEFAULT_MAX_IDLE_CONNECTIONS_PER_HOST: usize = 1;
/// - The length of recycle[K] is less than or equal to max_idle_connections_per_host.
///
/// *Internal API*
#[derive(Debug)]
pub(crate) struct ConnectionPool {
inner: Mutex<Inner>,
max_idle_connections: usize,
max_idle_connections_per_host: usize,
}
struct Inner {
// the actual pooled connection. however only one per hostname:port.
recycle: HashMap<PoolKey, VecDeque<Stream>>,
// This is used to keep track of which streams to expire when the
@@ -43,10 +49,17 @@ pub(crate) struct ConnectionPool {
// recently used Streams are added to the back of the queue;
// old streams are removed from the front.
lru: VecDeque<PoolKey>,
max_idle_connections: usize,
max_idle_connections_per_host: usize,
}
impl fmt::Debug for ConnectionPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionPool")
.field("max_idle", &self.max_idle_connections)
.field("max_idle_per_host", &self.max_idle_connections_per_host)
.field("connections", &self.inner.lock().unwrap().lru.len())
.finish()
}
}
fn remove_first_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
match list.iter().position(|x| x == key) {
Some(i) => list.remove(i),
@@ -66,8 +79,10 @@ impl Default for ConnectionPool {
Self {
max_idle_connections: DEFAULT_MAX_IDLE_CONNECTIONS,
max_idle_connections_per_host: DEFAULT_MAX_IDLE_CONNECTIONS_PER_HOST,
recycle: HashMap::default(),
lru: VecDeque::default(),
inner: Mutex::new(Inner {
recycle: HashMap::default(),
lru: VecDeque::default(),
}),
}
}
}
@@ -75,8 +90,10 @@ impl Default for ConnectionPool {
impl ConnectionPool {
pub(crate) fn new(max_idle_connections: usize, max_idle_connections_per_host: usize) -> Self {
ConnectionPool {
recycle: Default::default(),
lru: Default::default(),
inner: Mutex::new(Inner {
recycle: HashMap::default(),
lru: VecDeque::default(),
}),
max_idle_connections,
max_idle_connections_per_host,
}
@@ -88,13 +105,14 @@ impl ConnectionPool {
}
/// How the unit::connect tries to get a pooled connection.
pub fn try_get_connection(&mut self, url: &Url, proxy: &Option<Proxy>) -> Option<Stream> {
pub fn try_get_connection(&self, url: &Url, proxy: &Option<Proxy>) -> Option<Stream> {
let key = PoolKey::new(url, proxy);
self.remove(&key)
}
fn remove(&mut self, key: &PoolKey) -> Option<Stream> {
match self.recycle.entry(key.clone()) {
fn remove(&self, key: &PoolKey) -> Option<Stream> {
let mut inner = self.inner.lock().unwrap();
match inner.recycle.entry(key.clone()) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
// Take the newest stream.
@@ -107,7 +125,7 @@ impl ConnectionPool {
// Remove the newest matching PoolKey from self.lru. That
// corresponds to the stream we just removed from `recycle`.
remove_last_match(&mut self.lru, &key)
remove_last_match(&mut inner.lru, &key)
.expect("invariant failed: key in recycle but not in lru");
Some(stream)
@@ -116,19 +134,20 @@ impl ConnectionPool {
}
}
fn add(&mut self, key: PoolKey, stream: Stream) {
fn add(&self, key: PoolKey, stream: Stream) {
if self.noop() {
return;
}
match self.recycle.entry(key.clone()) {
let mut inner = self.inner.lock().unwrap();
match inner.recycle.entry(key.clone()) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
streams.push_back(stream);
if streams.len() > self.max_idle_connections_per_host {
// Remove the oldest entry
streams.pop_front();
remove_first_match(&mut self.lru, &key)
remove_first_match(&mut inner.lru, &key)
.expect("invariant failed: key in recycle but not in lru");
}
}
@@ -136,19 +155,21 @@ impl ConnectionPool {
vacant_entry.insert(vec![stream].into());
}
}
self.lru.push_back(key);
if self.lru.len() > self.max_idle_connections {
inner.lru.push_back(key);
if inner.lru.len() > self.max_idle_connections {
drop(inner);
self.remove_oldest()
}
}
/// Find the oldest stream in the pool. Remove its representation from lru,
/// and the stream itself from `recycle`. Drops the stream, which closes it.
fn remove_oldest(&mut self) {
fn remove_oldest(&self) {
assert!(!self.noop(), "remove_oldest called on Pool with max of 0");
let key = self.lru.pop_front();
let mut inner = self.inner.lock().unwrap();
let key = inner.lru.pop_front();
let key = key.expect("tried to remove oldest but no entries found!");
match self.recycle.entry(key) {
match inner.recycle.entry(key) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
streams
@@ -164,7 +185,7 @@ impl ConnectionPool {
#[cfg(test)]
pub fn len(&self) -> usize {
self.lru.len()
self.inner.lock().unwrap().lru.len()
}
}
@@ -212,7 +233,7 @@ fn pool_connections_limit() {
// Test inserting connections with different keys into the pool,
// filling and draining it. The pool should evict earlier connections
// when the connection limit is reached.
let mut pool = ConnectionPool::default();
let pool = ConnectionPool::default();
let hostnames = (0..DEFAULT_MAX_IDLE_CONNECTIONS * 2).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
@@ -237,7 +258,7 @@ fn pool_per_host_connections_limit() {
// Test inserting connections with the same key into the pool,
// filling and draining it. The pool should evict earlier connections
// when the per-host connection limit is reached.
let mut pool = ConnectionPool::default();
let pool = ConnectionPool::default();
let poolkey = PoolKey {
scheme: "https".to_string(),
hostname: "example.com".to_string(),
@@ -264,7 +285,7 @@ fn pool_per_host_connections_limit() {
fn pool_checks_proxy() {
// Test inserting different poolkeys with same address but different proxies.
// Each insertion should result in an additional entry in the pool.
let mut pool = ConnectionPool::default();
let pool = ConnectionPool::default();
let url = Url::parse("zzz:///example.com").unwrap();
pool.add(
@@ -311,7 +332,6 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
fn return_connection(&mut self) -> io::Result<()> {
// guard we only do this once.
if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) {
let state = &mut unit.req.agent.lock().unwrap();
// bring back stream here to either go into pool or dealloc
let mut stream = reader.into();
if !stream.is_poolable() {
@@ -324,7 +344,7 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
// insert back into pool
let key = PoolKey::new(&unit.url, &unit.req.proxy);
state.pool().add(key, stream);
unit.req.agent.state.pool.add(key, stream);
}
Ok(())