Document and tidy up new Pool invariants. (#142)
Now that we allow multiple connections for the same PoolKey, update our invariants accordingly. Also provide a couple of helper functions for removing the first or last match of an entry in a VecDeque. This also changes which entry from `lru` gets removed when a stream is removed from the pool. Previously it was the oldest matching one. Now it's the newest matching one, which matches the semantics we are applying to `recycle[K]`. Followup to #133 /cc @cfal
This commit is contained in:
committed by
GitHub
parent
05ffe53e4c
commit
b87299e988
110
src/pool.rs
110
src/pool.rs
@@ -14,9 +14,25 @@ const DEFAULT_MAX_IDLE_CONNECTIONS_PER_HOST: usize = 1;
|
|||||||
|
|
||||||
/// Holder of recycled connections.
|
/// Holder of recycled connections.
|
||||||
///
|
///
|
||||||
/// Invariant: The length of recycle and lru are the same.
|
/// For each PoolKey (approximately hostname and port), there may be
|
||||||
/// Invariant: Each PoolKey exists as a key in recycle, and vice versa.
|
/// multiple connections stored in the `recycle` map. If so, they are stored in
|
||||||
/// Invariant: Each PoolKey exists in recycle at most once and lru at most once.
|
/// order from oldest at the front to freshest at the back.
|
||||||
|
///
|
||||||
|
/// The `lru` VecDeque is a companion struct to `recycle`, and is used to keep
|
||||||
|
/// track of which connections to expire if the pool is full on the next insert.
|
||||||
|
/// A given PoolKey can occur in lru multiple times. The first entry in lru for
|
||||||
|
/// a key K represents the first entry in `recycle[K]`. The second entry in lru
|
||||||
|
/// for `K` represents the second entry in `recycle[K]`, and so on. In other
|
||||||
|
/// words, `lru` is ordered the same way as the VecDeque entries in `recycle`:
|
||||||
|
/// oldest at the front, freshest at the back. This allows keeping track of which
|
||||||
|
/// host should have its connection dropped next.
|
||||||
|
///
|
||||||
|
/// These invariants hold at the start and end of each method:
|
||||||
|
/// - The length `lru` is equal to the sum of lengths of `recycle`'s VecDeques.
|
||||||
|
/// - Each PoolKey exists the same number of times in `lru` as it has entries in `recycle`.
|
||||||
|
/// - If there is an entry in `recycle`, it has at least one element.
|
||||||
|
/// - The length of `lru` is less than or equal to max_idle_connections.
|
||||||
|
/// - The length of recycle[K] is less than or equal to max_idle_connections_per_host.
|
||||||
///
|
///
|
||||||
/// *Internal API*
|
/// *Internal API*
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -32,6 +48,20 @@ pub(crate) struct ConnectionPool {
|
|||||||
max_idle_connections_per_host: usize,
|
max_idle_connections_per_host: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_first_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
|
||||||
|
match list.iter().position(|x| x == key) {
|
||||||
|
Some(i) => list.remove(i),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_last_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
|
||||||
|
match list.iter().rposition(|x| x == key) {
|
||||||
|
Some(i) => list.remove(i),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for ConnectionPool {
|
impl Default for ConnectionPool {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -52,7 +82,6 @@ impl ConnectionPool {
|
|||||||
if self.max_idle_connections == max_connections {
|
if self.max_idle_connections == max_connections {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.max_idle_connections = max_connections;
|
|
||||||
|
|
||||||
if max_connections == 0 {
|
if max_connections == 0 {
|
||||||
// Clear the connection pool, caching is disabled.
|
// Clear the connection pool, caching is disabled.
|
||||||
@@ -65,13 +94,18 @@ impl ConnectionPool {
|
|||||||
while self.lru.len() > max_connections {
|
while self.lru.len() > max_connections {
|
||||||
self.remove_oldest();
|
self.remove_oldest();
|
||||||
}
|
}
|
||||||
|
self.max_idle_connections = max_connections;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return true if either of the max_* settings is 0, meaning we should do no work.
|
||||||
|
fn noop(&self) -> bool {
|
||||||
|
self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_max_idle_connections_per_host(&mut self, max_connections: usize) {
|
pub fn set_max_idle_connections_per_host(&mut self, max_connections: usize) {
|
||||||
if self.max_idle_connections_per_host == max_connections {
|
if self.max_idle_connections_per_host == max_connections {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.max_idle_connections_per_host = max_connections;
|
|
||||||
|
|
||||||
if max_connections == 0 {
|
if max_connections == 0 {
|
||||||
// Clear the connection pool, caching is disabled.
|
// Clear the connection pool, caching is disabled.
|
||||||
@@ -83,15 +117,13 @@ impl ConnectionPool {
|
|||||||
// Remove any extra streams if the number was decreased.
|
// Remove any extra streams if the number was decreased.
|
||||||
for (key, val) in self.recycle.iter_mut() {
|
for (key, val) in self.recycle.iter_mut() {
|
||||||
while val.len() > max_connections {
|
while val.len() > max_connections {
|
||||||
|
// Remove the oldest entry
|
||||||
val.pop_front();
|
val.pop_front();
|
||||||
let index = self
|
remove_first_match(&mut self.lru, key)
|
||||||
.lru
|
.expect("invariant failed: key in recycle but not in lru");
|
||||||
.iter()
|
|
||||||
.position(|x| x == key)
|
|
||||||
.expect("PoolKey not found in lru");
|
|
||||||
self.lru.remove(index);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.max_idle_connections_per_host = max_connections;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// How the unit::connect tries to get a pooled connection.
|
/// How the unit::connect tries to get a pooled connection.
|
||||||
@@ -106,38 +138,25 @@ impl ConnectionPool {
|
|||||||
let streams = occupied_entry.get_mut();
|
let streams = occupied_entry.get_mut();
|
||||||
// Take the newest stream.
|
// Take the newest stream.
|
||||||
let stream = streams.pop_back();
|
let stream = streams.pop_back();
|
||||||
assert!(
|
let stream = stream.expect("invariant failed: empty VecDeque in `recycle`");
|
||||||
stream.is_some(),
|
|
||||||
"key existed in recycle but no streams available"
|
|
||||||
);
|
|
||||||
|
|
||||||
if streams.len() == 0 {
|
if streams.len() == 0 {
|
||||||
occupied_entry.remove();
|
occupied_entry.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the oldest matching PoolKey from self.lru.
|
// Remove the newest matching PoolKey from self.lru. That
|
||||||
// since this PoolKey was most recently used, removing the oldest
|
// corresponds to the stream we just removed from `recycle`.
|
||||||
// PoolKey would delay other streams with this address from
|
remove_last_match(&mut self.lru, &key)
|
||||||
// being removed.
|
.expect("invariant failed: key in recycle but not in lru");
|
||||||
self.remove_from_lru(key);
|
|
||||||
|
|
||||||
stream
|
Some(stream)
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_from_lru(&mut self, key: &PoolKey) {
|
|
||||||
let index = self
|
|
||||||
.lru
|
|
||||||
.iter()
|
|
||||||
.position(|x| x == key)
|
|
||||||
.expect("PoolKey not found in lru");
|
|
||||||
self.lru.remove(index);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add(&mut self, key: PoolKey, stream: Stream) {
|
fn add(&mut self, key: PoolKey, stream: Stream) {
|
||||||
if self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0 {
|
if self.noop() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,14 +165,14 @@ impl ConnectionPool {
|
|||||||
let streams = occupied_entry.get_mut();
|
let streams = occupied_entry.get_mut();
|
||||||
streams.push_back(stream);
|
streams.push_back(stream);
|
||||||
if streams.len() > self.max_idle_connections_per_host {
|
if streams.len() > self.max_idle_connections_per_host {
|
||||||
|
// Remove the oldest entry
|
||||||
streams.pop_front();
|
streams.pop_front();
|
||||||
self.remove_from_lru(&key);
|
remove_first_match(&mut self.lru, &key)
|
||||||
|
.expect("invariant failed: key in recycle but not in lru");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Entry::Vacant(vacant_entry) => {
|
Entry::Vacant(vacant_entry) => {
|
||||||
let mut new_deque = VecDeque::new();
|
vacant_entry.insert(vec![stream].into());
|
||||||
new_deque.push_back(stream);
|
|
||||||
vacant_entry.insert(new_deque);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.lru.push_back(key);
|
self.lru.push_back(key);
|
||||||
@@ -162,26 +181,23 @@ impl ConnectionPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Find the oldest stream in the pool. Remove its representation from lru,
|
||||||
|
/// and the stream itself from `recycle`. Drops the stream, which closes it.
|
||||||
fn remove_oldest(&mut self) {
|
fn remove_oldest(&mut self) {
|
||||||
if let Some(key) = self.lru.pop_front() {
|
assert!(!self.noop(), "remove_oldest called on Pool with max of 0");
|
||||||
|
let key = self.lru.pop_front();
|
||||||
|
let key = key.expect("tried to remove oldest but no entries found!");
|
||||||
match self.recycle.entry(key) {
|
match self.recycle.entry(key) {
|
||||||
Entry::Occupied(mut occupied_entry) => {
|
Entry::Occupied(mut occupied_entry) => {
|
||||||
let streams = occupied_entry.get_mut();
|
let streams = occupied_entry.get_mut();
|
||||||
let removed_stream = streams.pop_front();
|
streams
|
||||||
assert!(
|
.pop_front()
|
||||||
removed_stream.is_some(),
|
.expect("invariant failed: key existed in recycle but no streams available");
|
||||||
"key existed in recycle but no streams available"
|
|
||||||
);
|
|
||||||
if streams.len() == 0 {
|
if streams.len() == 0 {
|
||||||
occupied_entry.remove();
|
occupied_entry.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => {
|
Entry::Vacant(_) => panic!("invariant failed: key existed in lru but not in recycle"),
|
||||||
panic!("invariant failed: key existed in lru but not in recycle")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
panic!("tried to remove oldest but no entries found!");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user