diff --git a/src/pool.rs b/src/pool.rs index e1a3e62..4663647 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -14,9 +14,25 @@ const DEFAULT_MAX_IDLE_CONNECTIONS_PER_HOST: usize = 1; /// Holder of recycled connections. /// -/// Invariant: The length of recycle and lru are the same. -/// Invariant: Each PoolKey exists as a key in recycle, and vice versa. -/// Invariant: Each PoolKey exists in recycle at most once and lru at most once. +/// For each PoolKey (approximately hostname and port), there may be +/// multiple connections stored in the `recycle` map. If so, they are stored in +/// order from oldest at the front to freshest at the back. +/// +/// The `lru` VecDeque is a companion struct to `recycle`, and is used to keep +/// track of which connections to expire if the pool is full on the next insert. +/// A given PoolKey can occur in lru multiple times. The first entry in lru for +/// a key K represents the first entry in `recycle[K]`. The second entry in lru +/// for `K` represents the second entry in `recycle[K]`, and so on. In other +/// words, `lru` is ordered the same way as the VecDeque entries in `recycle`: +/// oldest at the front, freshest at the back. This allows keeping track of which +/// host should have its connection dropped next. +/// +/// These invariants hold at the start and end of each method: +/// - The length `lru` is equal to the sum of lengths of `recycle`'s VecDeques. +/// - Each PoolKey exists the same number of times in `lru` as it has entries in `recycle`. +/// - If there is an entry in `recycle`, it has at least one element. +/// - The length of `lru` is less than or equal to max_idle_connections. +/// - The length of recycle[K] is less than or equal to max_idle_connections_per_host. /// /// *Internal API* #[derive(Debug)] @@ -32,6 +48,20 @@ pub(crate) struct ConnectionPool { max_idle_connections_per_host: usize, } +fn remove_first_match(list: &mut VecDeque, key: &PoolKey) -> Option { + match list.iter().position(|x| x == key) { + Some(i) => list.remove(i), + None => None, + } +} + +fn remove_last_match(list: &mut VecDeque, key: &PoolKey) -> Option { + match list.iter().rposition(|x| x == key) { + Some(i) => list.remove(i), + None => None, + } +} + impl Default for ConnectionPool { fn default() -> Self { Self { @@ -52,7 +82,6 @@ impl ConnectionPool { if self.max_idle_connections == max_connections { return; } - self.max_idle_connections = max_connections; if max_connections == 0 { // Clear the connection pool, caching is disabled. @@ -65,13 +94,18 @@ impl ConnectionPool { while self.lru.len() > max_connections { self.remove_oldest(); } + self.max_idle_connections = max_connections; + } + + /// Return true if either of the max_* settings is 0, meaning we should do no work. + fn noop(&self) -> bool { + self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0 } pub fn set_max_idle_connections_per_host(&mut self, max_connections: usize) { if self.max_idle_connections_per_host == max_connections { return; } - self.max_idle_connections_per_host = max_connections; if max_connections == 0 { // Clear the connection pool, caching is disabled. @@ -83,15 +117,13 @@ impl ConnectionPool { // Remove any extra streams if the number was decreased. for (key, val) in self.recycle.iter_mut() { while val.len() > max_connections { + // Remove the oldest entry val.pop_front(); - let index = self - .lru - .iter() - .position(|x| x == key) - .expect("PoolKey not found in lru"); - self.lru.remove(index); + remove_first_match(&mut self.lru, key) + .expect("invariant failed: key in recycle but not in lru"); } } + self.max_idle_connections_per_host = max_connections; } /// How the unit::connect tries to get a pooled connection. @@ -106,38 +138,25 @@ impl ConnectionPool { let streams = occupied_entry.get_mut(); // Take the newest stream. let stream = streams.pop_back(); - assert!( - stream.is_some(), - "key existed in recycle but no streams available" - ); + let stream = stream.expect("invariant failed: empty VecDeque in `recycle`"); if streams.len() == 0 { occupied_entry.remove(); } - // Remove the oldest matching PoolKey from self.lru. - // since this PoolKey was most recently used, removing the oldest - // PoolKey would delay other streams with this address from - // being removed. - self.remove_from_lru(key); + // Remove the newest matching PoolKey from self.lru. That + // corresponds to the stream we just removed from `recycle`. + remove_last_match(&mut self.lru, &key) + .expect("invariant failed: key in recycle but not in lru"); - stream + Some(stream) } Entry::Vacant(_) => None, } } - fn remove_from_lru(&mut self, key: &PoolKey) { - let index = self - .lru - .iter() - .position(|x| x == key) - .expect("PoolKey not found in lru"); - self.lru.remove(index); - } - fn add(&mut self, key: PoolKey, stream: Stream) { - if self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0 { + if self.noop() { return; } @@ -146,14 +165,14 @@ impl ConnectionPool { let streams = occupied_entry.get_mut(); streams.push_back(stream); if streams.len() > self.max_idle_connections_per_host { + // Remove the oldest entry streams.pop_front(); - self.remove_from_lru(&key); + remove_first_match(&mut self.lru, &key) + .expect("invariant failed: key in recycle but not in lru"); } } Entry::Vacant(vacant_entry) => { - let mut new_deque = VecDeque::new(); - new_deque.push_back(stream); - vacant_entry.insert(new_deque); + vacant_entry.insert(vec![stream].into()); } } self.lru.push_back(key); @@ -162,26 +181,23 @@ impl ConnectionPool { } } + /// Find the oldest stream in the pool. Remove its representation from lru, + /// and the stream itself from `recycle`. Drops the stream, which closes it. fn remove_oldest(&mut self) { - if let Some(key) = self.lru.pop_front() { - match self.recycle.entry(key) { - Entry::Occupied(mut occupied_entry) => { - let streams = occupied_entry.get_mut(); - let removed_stream = streams.pop_front(); - assert!( - removed_stream.is_some(), - "key existed in recycle but no streams available" - ); - if streams.len() == 0 { - occupied_entry.remove(); - } - } - Entry::Vacant(_) => { - panic!("invariant failed: key existed in lru but not in recycle") + assert!(!self.noop(), "remove_oldest called on Pool with max of 0"); + let key = self.lru.pop_front(); + let key = key.expect("tried to remove oldest but no entries found!"); + match self.recycle.entry(key) { + Entry::Occupied(mut occupied_entry) => { + let streams = occupied_entry.get_mut(); + streams + .pop_front() + .expect("invariant failed: key existed in recycle but no streams available"); + if streams.len() == 0 { + occupied_entry.remove(); } } - } else { - panic!("tried to remove oldest but no entries found!"); + Entry::Vacant(_) => panic!("invariant failed: key existed in lru but not in recycle"), } }