Limit max idle connections. (#81)

Adds limit of 100 connections to ConnectionPool. This is implemented
with a VecDeque acting as an LRU. When the pool becomes full, the oldest
stream is popped off the back from the VecDeque and also removed from
the map of PoolKeys to Streams.

Fixes #77
This commit is contained in:
Jacob Hoffman-Andrews
2020-06-23 23:44:47 -07:00
committed by GitHub
parent a6e99c8b36
commit 2d6747717d

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::io::{Read, Result as IoResult}; use std::io::{Read, Result as IoResult};
use crate::stream::Stream; use crate::stream::Stream;
@@ -7,14 +7,24 @@ use crate::unit::Unit;
use url::Url; use url::Url;
pub const DEFAULT_HOST: &str = "localhost"; pub const DEFAULT_HOST: &str = "localhost";
const MAX_IDLE_CONNECTIONS: usize = 100;
/// Holder of recycled connections. /// Holder of recycled connections.
/// ///
/// Invariant: The length of recycle and lru are the same.
/// Invariant: Each PoolKey exists as a key in recycle, and vice versa.
/// Invariant: Each PoolKey exists in recycle at most once and lru at most once.
///
/// *Internal API* /// *Internal API*
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub(crate) struct ConnectionPool { pub(crate) struct ConnectionPool {
// the actual pooled connection. however only one per hostname:port. // the actual pooled connection. however only one per hostname:port.
recycle: HashMap<PoolKey, Stream>, recycle: HashMap<PoolKey, Stream>,
// This is used to keep track of which streams to expire when the
// pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for
// recently used Streams are added to the back of the queue;
// old streams are removed from the front.
lru: VecDeque<PoolKey>,
} }
impl ConnectionPool { impl ConnectionPool {
@@ -26,7 +36,44 @@ impl ConnectionPool {
/// How the unit::connect tries to get a pooled connection. /// How the unit::connect tries to get a pooled connection.
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> { pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
self.recycle.remove(&PoolKey::new(url)) let key = PoolKey::new(url);
self.remove(&key)
}
fn remove(&mut self, key: &PoolKey) -> Option<Stream> {
if !self.recycle.contains_key(&key) {
return None;
}
let index = self.lru.iter().position(|k| k == key);
assert!(
index.is_some(),
"invariant failed: key existed in recycle but not lru"
);
self.lru.remove(index.unwrap());
self.recycle.remove(&key)
}
fn add(&mut self, key: PoolKey, stream: Stream) {
// If an entry with the same key already exists, remove it.
// The more recently used stream is likely to live longer.
self.remove(&key);
if self.recycle.len() + 1 > MAX_IDLE_CONNECTIONS {
self.remove_oldest();
}
self.lru.push_back(key.clone());
self.recycle.insert(key, stream);
}
fn remove_oldest(&mut self) {
if let Some(key) = self.lru.pop_front() {
let removed = self.recycle.remove(&key);
assert!(
removed.is_some(),
"invariant failed: key existed in lru but not in recycle"
);
} else {
panic!("tried to remove oldest but no entries found!");
}
} }
#[cfg(test)] #[cfg(test)]
@@ -35,13 +82,26 @@ impl ConnectionPool {
} }
} }
#[derive(Debug, PartialEq, Clone, Eq, Hash)] #[derive(PartialEq, Clone, Eq, Hash)]
struct PoolKey { struct PoolKey {
scheme: String, scheme: String,
hostname: String, hostname: String,
port: Option<u16>, port: Option<u16>,
} }
use std::fmt;
impl fmt::Debug for PoolKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"{}|{}|{}",
self.scheme,
self.hostname,
self.port.unwrap_or(0)
))
}
}
impl PoolKey { impl PoolKey {
fn new(url: &Url) -> Self { fn new(url: &Url) -> Self {
let port = url.port_or_known_default(); let port = url.port_or_known_default();
@@ -59,6 +119,52 @@ fn poolkey_new() {
PoolKey::new(&Url::parse("zzz:///example.com").unwrap()); PoolKey::new(&Url::parse("zzz:///example.com").unwrap());
} }
#[test]
fn pool_size_limit() {
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
let mut pool = ConnectionPool::new();
let hostnames = (0..200).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
hostname,
port: Some(999),
});
for key in poolkeys.clone() {
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
}
assert_eq!(pool.len(), 100);
for key in poolkeys.skip(100) {
let result = pool.remove(&key);
assert!(result.is_some(), "expected key was not in pool");
}
}
#[test]
fn pool_duplicates_limit() {
// Test inserting duplicates into the pool, and subsequently
// filling and draining it. The duplicates should evict earlier
// entries with the same key.
assert_eq!(MAX_IDLE_CONNECTIONS, 100);
let mut pool = ConnectionPool::new();
let hostnames = (0..100).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
hostname,
port: Some(999),
});
for key in poolkeys.clone() {
pool.add(key.clone(), Stream::Cursor(std::io::Cursor::new(vec![])));
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
}
assert_eq!(pool.len(), 100);
for key in poolkeys {
let result = pool.remove(&key);
assert!(result.is_some(), "expected key was not in pool");
}
}
/// Read wrapper that returns the stream to the pool once the /// Read wrapper that returns the stream to the pool once the
/// read is exhausted (reached a 0). /// read is exhausted (reached a 0).
/// ///
@@ -91,7 +197,7 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
} }
// insert back into pool // insert back into pool
let key = PoolKey::new(&unit.url); let key = PoolKey::new(&unit.url);
agent.pool().recycle.insert(key, stream); agent.pool().add(key, stream);
} }
} }
} }