Return stream to pool on exact read (#509)
If the user reads exactly the number of bytes in the response, then drops the Read object, streams will never get returned to the pool since the user never triggered a read past the end of the LimitedRead. This fixes that by making PoolReturnRead aware of the level below it, so it can ask if a stream is "done" without actually doing a read. Also, a refactorign: Previously, Response contained an Option<Box<Unit>> because the testing method `from_str()` would construct a Response with no associated Unit. However, this increased code complexity with no corresponding test benefit. Instead, construct a fake Unit in from_str(). Also, instead of taking an `Option<Box<Unit>>`, PoolReturnRead now takes a URL (to figure out host and port for the PoolKey) and an &Agent where it will return the stream. This cuts interconnectedness somewhat: PoolReturnRead doesn't need to know about Unit anymore.
This commit is contained in:
committed by
GitHub
parent
4d77d365a0
commit
101467f13f
102
src/pool.rs
102
src/pool.rs
@@ -3,10 +3,11 @@ use std::collections::{HashMap, VecDeque};
|
||||
use std::io::{self, Read};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::response::LimitedRead;
|
||||
use crate::stream::Stream;
|
||||
use crate::unit::Unit;
|
||||
use crate::Proxy;
|
||||
use crate::{Agent, Proxy};
|
||||
|
||||
use chunked_transfer::Decoder;
|
||||
use log::debug;
|
||||
use url::Url;
|
||||
|
||||
@@ -123,7 +124,7 @@ impl ConnectionPool {
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&self, key: PoolKey, stream: Stream) {
|
||||
fn add(&self, key: &PoolKey, stream: Stream) {
|
||||
if self.noop() {
|
||||
return;
|
||||
}
|
||||
@@ -143,7 +144,7 @@ impl ConnectionPool {
|
||||
streams.len(),
|
||||
stream
|
||||
);
|
||||
remove_first_match(&mut inner.lru, &key)
|
||||
remove_first_match(&mut inner.lru, key)
|
||||
.expect("invariant failed: key in recycle but not in lru");
|
||||
}
|
||||
}
|
||||
@@ -151,7 +152,7 @@ impl ConnectionPool {
|
||||
vacant_entry.insert(vec![stream].into());
|
||||
}
|
||||
}
|
||||
inner.lru.push_back(key);
|
||||
inner.lru.push_back(key.clone());
|
||||
if inner.lru.len() > self.max_idle_connections {
|
||||
drop(inner);
|
||||
self.remove_oldest()
|
||||
@@ -219,28 +220,32 @@ impl PoolKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// Read wrapper that returns the stream to the pool once the
|
||||
/// Read wrapper that returns a stream to the pool once the
|
||||
/// read is exhausted (reached a 0).
|
||||
///
|
||||
/// *Internal API*
|
||||
pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
|
||||
// unit that contains the agent where we want to return the reader.
|
||||
unit: Option<Box<Unit>>,
|
||||
// wrapped reader around the same stream
|
||||
// the agent where we want to return the stream.
|
||||
agent: Agent,
|
||||
// wrapped reader around the same stream. It's an Option because we `take()` it
|
||||
// upon returning the stream to the Agent.
|
||||
reader: Option<R>,
|
||||
// Key under which to store the stream when we're done.
|
||||
key: PoolKey,
|
||||
}
|
||||
|
||||
impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
|
||||
pub fn new(unit: Option<Box<Unit>>, reader: R) -> Self {
|
||||
pub fn new(agent: &Agent, url: &Url, reader: R) -> Self {
|
||||
PoolReturnRead {
|
||||
unit,
|
||||
agent: agent.clone(),
|
||||
key: PoolKey::new(url, agent.config.proxy.clone()),
|
||||
reader: Some(reader),
|
||||
}
|
||||
}
|
||||
|
||||
fn return_connection(&mut self) -> io::Result<()> {
|
||||
// guard we only do this once.
|
||||
if let (Some(unit), Some(reader)) = (self.unit.take(), self.reader.take()) {
|
||||
if let Some(reader) = self.reader.take() {
|
||||
// bring back stream here to either go into pool or dealloc
|
||||
let mut stream = reader.into();
|
||||
if !stream.is_poolable() {
|
||||
@@ -252,8 +257,7 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
|
||||
stream.reset()?;
|
||||
|
||||
// insert back into pool
|
||||
let key = PoolKey::new(&unit.url, unit.agent.config.proxy.clone());
|
||||
unit.agent.state.pool.add(key, stream);
|
||||
self.agent.state.pool.add(&self.key, stream);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -267,12 +271,33 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
|
||||
// Done allows a reader to indicate it is done (next read will return Ok(0))
|
||||
// without actually performing a read. This is useful so LimitedRead can
|
||||
// inform PoolReturnRead to return a stream to the pool even if the user
|
||||
// never read past the end of the response (For instance because their
|
||||
// application is handling length information on its own).
|
||||
pub(crate) trait Done {
|
||||
fn done(&self) -> bool;
|
||||
}
|
||||
|
||||
impl<R: Read> Done for LimitedRead<R> {
|
||||
fn done(&self) -> bool {
|
||||
self.remaining() == 0
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Done for Decoder<R> {
|
||||
fn done(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let amount = self.do_read(buf)?;
|
||||
// only if the underlying reader is exhausted can we send a new
|
||||
// request to the same socket. hence, we only return it now.
|
||||
if amount == 0 {
|
||||
if amount == 0 || self.reader.as_ref().map(|r| r.done()).unwrap_or_default() {
|
||||
self.return_connection()?;
|
||||
}
|
||||
Ok(amount)
|
||||
@@ -303,7 +328,7 @@ mod tests {
|
||||
proxy: None,
|
||||
});
|
||||
for key in poolkeys.clone() {
|
||||
pool.add(key, Stream::from_vec(vec![]))
|
||||
pool.add(&key, Stream::from_vec(vec![]))
|
||||
}
|
||||
assert_eq!(pool.len(), pool.max_idle_connections);
|
||||
|
||||
@@ -328,7 +353,7 @@ mod tests {
|
||||
};
|
||||
|
||||
for _ in 0..pool.max_idle_connections_per_host * 2 {
|
||||
pool.add(poolkey.clone(), Stream::from_vec(vec![]))
|
||||
pool.add(&poolkey, Stream::from_vec(vec![]))
|
||||
}
|
||||
assert_eq!(pool.len(), pool.max_idle_connections_per_host);
|
||||
|
||||
@@ -345,23 +370,42 @@ mod tests {
|
||||
// Each insertion should result in an additional entry in the pool.
|
||||
let pool = ConnectionPool::new_with_limits(10, 1);
|
||||
let url = Url::parse("zzz:///example.com").unwrap();
|
||||
let pool_key = PoolKey::new(&url, None);
|
||||
|
||||
pool.add(PoolKey::new(&url, None), Stream::from_vec(vec![]));
|
||||
pool.add(&pool_key, Stream::from_vec(vec![]));
|
||||
assert_eq!(pool.len(), 1);
|
||||
|
||||
pool.add(
|
||||
PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap())),
|
||||
Stream::from_vec(vec![]),
|
||||
);
|
||||
let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap()));
|
||||
|
||||
pool.add(&pool_key, Stream::from_vec(vec![]));
|
||||
assert_eq!(pool.len(), 2);
|
||||
|
||||
pool.add(
|
||||
PoolKey::new(
|
||||
&url,
|
||||
Some(Proxy::new("user:password@localhost:9999").unwrap()),
|
||||
),
|
||||
Stream::from_vec(vec![]),
|
||||
let pool_key = PoolKey::new(
|
||||
&url,
|
||||
Some(Proxy::new("user:password@localhost:9999").unwrap()),
|
||||
);
|
||||
|
||||
pool.add(&pool_key, Stream::from_vec(vec![]));
|
||||
assert_eq!(pool.len(), 3);
|
||||
}
|
||||
|
||||
// Test that a stream gets returned to the pool if it was wrapped in a LimitedRead, and
|
||||
// user reads the exact right number of bytes (but never gets a read of 0 bytes).
|
||||
#[test]
|
||||
fn read_exact() {
|
||||
let url = Url::parse("https:///example.com").unwrap();
|
||||
|
||||
let mut out_buf = [0u8; 500];
|
||||
let long_vec = vec![0u8; 1000];
|
||||
|
||||
let agent = Agent::new();
|
||||
let stream = Stream::from_vec_poolable(long_vec);
|
||||
let limited_read = LimitedRead::new(stream, 500);
|
||||
|
||||
let mut pool_return_read = PoolReturnRead::new(&agent, &url, limited_read);
|
||||
|
||||
pool_return_read.read_exact(&mut out_buf).unwrap();
|
||||
|
||||
assert_eq!(agent.state.pool.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user