diff --git a/src/pool.rs b/src/pool.rs index f5db413..07e28c4 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::io::{Read, Result as IoResult}; use crate::stream::Stream; @@ -7,14 +7,24 @@ use crate::unit::Unit; use url::Url; pub const DEFAULT_HOST: &str = "localhost"; +const MAX_IDLE_CONNECTIONS: usize = 100; /// Holder of recycled connections. /// +/// Invariant: The length of recycle and lru are the same. +/// Invariant: Each PoolKey exists as a key in recycle, and vice versa. +/// Invariant: Each PoolKey exists in recycle at most once and lru at most once. +/// /// *Internal API* #[derive(Default, Debug)] pub(crate) struct ConnectionPool { // the actual pooled connection. however only one per hostname:port. recycle: HashMap, + // This is used to keep track of which streams to expire when the + // pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for + // recently used Streams are added to the back of the queue; + // old streams are removed from the front. + lru: VecDeque, } impl ConnectionPool { @@ -26,7 +36,44 @@ impl ConnectionPool { /// How the unit::connect tries to get a pooled connection. pub fn try_get_connection(&mut self, url: &Url) -> Option { - self.recycle.remove(&PoolKey::new(url)) + let key = PoolKey::new(url); + self.remove(&key) + } + + fn remove(&mut self, key: &PoolKey) -> Option { + if !self.recycle.contains_key(&key) { + return None; + } + let index = self.lru.iter().position(|k| k == key); + assert!( + index.is_some(), + "invariant failed: key existed in recycle but not lru" + ); + self.lru.remove(index.unwrap()); + self.recycle.remove(&key) + } + + fn add(&mut self, key: PoolKey, stream: Stream) { + // If an entry with the same key already exists, remove it. + // The more recently used stream is likely to live longer. + self.remove(&key); + if self.recycle.len() + 1 > MAX_IDLE_CONNECTIONS { + self.remove_oldest(); + } + self.lru.push_back(key.clone()); + self.recycle.insert(key, stream); + } + + fn remove_oldest(&mut self) { + if let Some(key) = self.lru.pop_front() { + let removed = self.recycle.remove(&key); + assert!( + removed.is_some(), + "invariant failed: key existed in lru but not in recycle" + ); + } else { + panic!("tried to remove oldest but no entries found!"); + } } #[cfg(test)] @@ -35,13 +82,26 @@ impl ConnectionPool { } } -#[derive(Debug, PartialEq, Clone, Eq, Hash)] +#[derive(PartialEq, Clone, Eq, Hash)] struct PoolKey { scheme: String, hostname: String, port: Option, } +use std::fmt; + +impl fmt::Debug for PoolKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!( + "{}|{}|{}", + self.scheme, + self.hostname, + self.port.unwrap_or(0) + )) + } +} + impl PoolKey { fn new(url: &Url) -> Self { let port = url.port_or_known_default(); @@ -59,6 +119,52 @@ fn poolkey_new() { PoolKey::new(&Url::parse("zzz:///example.com").unwrap()); } +#[test] +fn pool_size_limit() { + assert_eq!(MAX_IDLE_CONNECTIONS, 100); + let mut pool = ConnectionPool::new(); + let hostnames = (0..200).map(|i| format!("{}.example", i)); + let poolkeys = hostnames.map(|hostname| PoolKey { + scheme: "https".to_string(), + hostname, + port: Some(999), + }); + for key in poolkeys.clone() { + pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![]))); + } + assert_eq!(pool.len(), 100); + + for key in poolkeys.skip(100) { + let result = pool.remove(&key); + assert!(result.is_some(), "expected key was not in pool"); + } +} + +#[test] +fn pool_duplicates_limit() { + // Test inserting duplicates into the pool, and subsequently + // filling and draining it. The duplicates should evict earlier + // entries with the same key. + assert_eq!(MAX_IDLE_CONNECTIONS, 100); + let mut pool = ConnectionPool::new(); + let hostnames = (0..100).map(|i| format!("{}.example", i)); + let poolkeys = hostnames.map(|hostname| PoolKey { + scheme: "https".to_string(), + hostname, + port: Some(999), + }); + for key in poolkeys.clone() { + pool.add(key.clone(), Stream::Cursor(std::io::Cursor::new(vec![]))); + pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![]))); + } + assert_eq!(pool.len(), 100); + + for key in poolkeys { + let result = pool.remove(&key); + assert!(result.is_some(), "expected key was not in pool"); + } +} + /// Read wrapper that returns the stream to the pool once the /// read is exhausted (reached a 0). /// @@ -91,7 +197,7 @@ impl> PoolReturnRead { } // insert back into pool let key = PoolKey::new(&unit.url); - agent.pool().recycle.insert(key, stream); + agent.pool().add(key, stream); } } }