Redesign pooling mechanic (#565)

Introduce PoolReturner, a handle on an agent and a PoolKey that is
capable of returning a Stream to a Pool. Make Streams keep track of
their own PoolReturner, instead of having PoolReturnRead keep track of
that information.

For the LimitedRead code path, get rid of PoolReturnRead. Instead,
LimitedRead is responsible for returning its Stream to the Pool after
its second-to-last read. In other words, LimitedRead will return the
stream if the next read is guaranteed to return Ok(0).

Constructing a LimitedRead of size 0 is always wrong, because we could
always just return the stream immediately. Change the size argument to
NonZeroUsize to enforce that.

Remove the Done trait, which was only used for LimitedRead. It was used
to try and make sure we returned the stream to the pool on exact reads,
but was not reliable.

This does not yet move the ChunkDecoder code path away from
PoolReturnRead. That requires a little more work.

Part 1 of #559.  Fixes #555.
This commit is contained in:
Jacob Hoffman-Andrews
2022-12-03 23:42:58 -08:00
committed by GitHub
parent d8225b22ed
commit 9083d692f8
4 changed files with 197 additions and 113 deletions

View File

@@ -3,11 +3,9 @@ use std::collections::{HashMap, VecDeque};
use std::io::{self, Read}; use std::io::{self, Read};
use std::sync::Mutex; use std::sync::Mutex;
use crate::response::LimitedRead;
use crate::stream::Stream; use crate::stream::Stream;
use crate::{Agent, Proxy}; use crate::{Agent, Proxy};
use chunked_transfer::Decoder;
use log::debug; use log::debug;
use url::Url; use url::Url;
@@ -124,7 +122,7 @@ impl ConnectionPool {
} }
} }
fn add(&self, key: &PoolKey, stream: Stream) { pub(crate) fn add(&self, key: &PoolKey, stream: Stream) {
if self.noop() { if self.noop() {
return; return;
} }
@@ -188,7 +186,7 @@ impl ConnectionPool {
} }
#[derive(PartialEq, Clone, Eq, Hash)] #[derive(PartialEq, Clone, Eq, Hash)]
struct PoolKey { pub(crate) struct PoolKey {
scheme: String, scheme: String,
hostname: String, hostname: String,
port: Option<u16>, port: Option<u16>,
@@ -218,6 +216,40 @@ impl PoolKey {
proxy, proxy,
} }
} }
pub(crate) fn from_parts(scheme: &str, hostname: &str, port: u16) -> Self {
PoolKey {
scheme: scheme.to_string(),
hostname: hostname.to_string(),
port: Some(port),
proxy: None,
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct PoolReturner {
inner: Option<(Agent, PoolKey)>,
}
impl PoolReturner {
/// A PoolReturner that returns to the given Agent's Pool.
pub(crate) fn new(agent: Agent, pool_key: PoolKey) -> Self {
Self {
inner: Some((agent, pool_key)),
}
}
/// A PoolReturner that does nothing
pub(crate) fn none() -> Self {
Self { inner: None }
}
pub(crate) fn return_to_pool(&self, stream: Stream) {
if let Some((agent, pool_key)) = &self.inner {
agent.state.pool.add(pool_key, stream);
}
}
} }
/// Read wrapper that returns a stream to the pool once the /// Read wrapper that returns a stream to the pool once the
@@ -225,20 +257,14 @@ impl PoolKey {
/// ///
/// *Internal API* /// *Internal API*
pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> { pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
// the agent where we want to return the stream.
agent: Agent,
// wrapped reader around the same stream. It's an Option because we `take()` it // wrapped reader around the same stream. It's an Option because we `take()` it
// upon returning the stream to the Agent. // upon returning the stream to the Agent.
reader: Option<R>, reader: Option<R>,
// Key under which to store the stream when we're done.
key: PoolKey,
} }
impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> { impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
pub fn new(agent: &Agent, url: &Url, reader: R) -> Self { pub fn new(reader: R) -> Self {
PoolReturnRead { PoolReturnRead {
agent: agent.clone(),
key: PoolKey::new(url, agent.config.proxy.clone()),
reader: Some(reader), reader: Some(reader),
} }
} }
@@ -247,13 +273,8 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
// guard we only do this once. // guard we only do this once.
if let Some(reader) = self.reader.take() { if let Some(reader) = self.reader.take() {
// bring back stream here to either go into pool or dealloc // bring back stream here to either go into pool or dealloc
let mut stream = reader.into(); let stream: Stream = reader.into();
stream.return_to_pool()?;
// ensure stream can be reused
stream.reset()?;
// insert back into pool
self.agent.state.pool.add(&self.key, stream);
} }
Ok(()) Ok(())
@@ -267,33 +288,12 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
} }
} }
// Done allows a reader to indicate it is done (next read will return Ok(0)) impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
// without actually performing a read. This is useful so LimitedRead can
// inform PoolReturnRead to return a stream to the pool even if the user
// never read past the end of the response (For instance because their
// application is handling length information on its own).
pub(crate) trait Done {
fn done(&self) -> bool;
}
impl<R: Read> Done for LimitedRead<R> {
fn done(&self) -> bool {
self.remaining() == 0
}
}
impl<R: Read> Done for Decoder<R> {
fn done(&self) -> bool {
false
}
}
impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let amount = self.do_read(buf)?; let amount = self.do_read(buf)?;
// only if the underlying reader is exhausted can we send a new // only if the underlying reader is exhausted can we send a new
// request to the same socket. hence, we only return it now. // request to the same socket. hence, we only return it now.
if amount == 0 || self.reader.as_ref().map(|r| r.done()).unwrap_or_default() { if amount == 0 {
self.return_connection()?; self.return_connection()?;
} }
Ok(amount) Ok(amount)
@@ -313,8 +313,8 @@ mod tests {
struct NoopStream; struct NoopStream;
impl NoopStream { impl NoopStream {
fn stream() -> Stream { fn stream(pool_returner: PoolReturner) -> Stream {
Stream::new(NoopStream, remote_addr_for_test()) Stream::new(NoopStream, remote_addr_for_test(), pool_returner)
} }
} }
@@ -360,7 +360,7 @@ mod tests {
proxy: None, proxy: None,
}); });
for key in poolkeys.clone() { for key in poolkeys.clone() {
pool.add(&key, NoopStream::stream()); pool.add(&key, NoopStream::stream(PoolReturner::none()));
} }
assert_eq!(pool.len(), pool.max_idle_connections); assert_eq!(pool.len(), pool.max_idle_connections);
@@ -385,7 +385,7 @@ mod tests {
}; };
for _ in 0..pool.max_idle_connections_per_host * 2 { for _ in 0..pool.max_idle_connections_per_host * 2 {
pool.add(&poolkey, NoopStream::stream()) pool.add(&poolkey, NoopStream::stream(PoolReturner::none()))
} }
assert_eq!(pool.len(), pool.max_idle_connections_per_host); assert_eq!(pool.len(), pool.max_idle_connections_per_host);
@@ -404,12 +404,12 @@ mod tests {
let url = Url::parse("zzz:///example.com").unwrap(); let url = Url::parse("zzz:///example.com").unwrap();
let pool_key = PoolKey::new(&url, None); let pool_key = PoolKey::new(&url, None);
pool.add(&pool_key, NoopStream::stream()); pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 1); assert_eq!(pool.len(), 1);
let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap())); let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap()));
pool.add(&pool_key, NoopStream::stream()); pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 2); assert_eq!(pool.len(), 2);
let pool_key = PoolKey::new( let pool_key = PoolKey::new(
@@ -417,7 +417,7 @@ mod tests {
Some(Proxy::new("user:password@localhost:9999").unwrap()), Some(Proxy::new("user:password@localhost:9999").unwrap()),
); );
pool.add(&pool_key, NoopStream::stream()); pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 3); assert_eq!(pool.len(), 3);
} }
@@ -425,17 +425,18 @@ mod tests {
// user reads the exact right number of bytes (but never gets a read of 0 bytes). // user reads the exact right number of bytes (but never gets a read of 0 bytes).
#[test] #[test]
fn read_exact() { fn read_exact() {
use crate::response::LimitedRead;
let url = Url::parse("https:///example.com").unwrap(); let url = Url::parse("https:///example.com").unwrap();
let mut out_buf = [0u8; 500]; let mut out_buf = [0u8; 500];
let agent = Agent::new(); let agent = Agent::new();
let stream = NoopStream::stream(); let pool_key = PoolKey::new(&url, None);
let limited_read = LimitedRead::new(stream, 500); let stream = NoopStream::stream(PoolReturner::new(agent.clone(), pool_key));
let mut limited_read = LimitedRead::new(stream, std::num::NonZeroUsize::new(500).unwrap());
let mut pool_return_read = PoolReturnRead::new(&agent, &url, limited_read); limited_read.read_exact(&mut out_buf).unwrap();
pool_return_read.read_exact(&mut out_buf).unwrap();
assert_eq!(agent.state.pool.len(), 1); assert_eq!(agent.state.pool.len(), 1);
} }
@@ -448,6 +449,7 @@ mod tests {
fn read_exact_chunked_gzip() { fn read_exact_chunked_gzip() {
use crate::response::Compression; use crate::response::Compression;
use chunked_transfer::Decoder as ChunkDecoder; use chunked_transfer::Decoder as ChunkDecoder;
use std::io::Cursor;
let gz_body = vec![ let gz_body = vec![
b'E', b'\r', b'\n', // 14 first chunk b'E', b'\r', b'\n', // 14 first chunk
@@ -464,28 +466,19 @@ mod tests {
b'\r', b'\n', // b'\r', b'\n', //
]; ];
println!("{:?}", gz_body);
impl ReadWrite for io::Cursor<Vec<u8>> {
fn socket(&self) -> Option<&std::net::TcpStream> {
None
}
}
impl From<io::Cursor<Vec<u8>>> for Stream {
fn from(c: io::Cursor<Vec<u8>>) -> Self {
Stream::new(c, "1.1.1.1:8080".parse().unwrap())
}
}
let agent = Agent::new(); let agent = Agent::new();
let url = Url::parse("https://example.com").unwrap();
assert_eq!(agent.state.pool.len(), 0); assert_eq!(agent.state.pool.len(), 0);
let chunked = ChunkDecoder::new(io::Cursor::new(gz_body)); let ro = crate::test::TestStream::new(Cursor::new(gz_body), std::io::sink());
let stream = Stream::new(
ro,
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(agent.clone(), PoolKey::from_parts("http", "1.1.1.1", 8080)),
);
let chunked = ChunkDecoder::new(stream);
let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> = let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> =
Box::new(PoolReturnRead::new(&agent, &url, chunked)); Box::new(PoolReturnRead::new(chunked));
let compression = Compression::Gzip; let compression = Compression::Gzip;
let mut stream = compression.wrap_reader(pool_return_read); let mut stream = compression.wrap_reader(pool_return_read);

View File

@@ -1,5 +1,6 @@
use std::io::{self, Cursor, Read}; use std::io::{self, Cursor, Read};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::str::FromStr; use std::str::FromStr;
use std::{fmt, io::BufRead}; use std::{fmt, io::BufRead};
@@ -10,7 +11,7 @@ use url::Url;
use crate::body::SizedReader; use crate::body::SizedReader;
use crate::error::{Error, ErrorKind::BadStatus}; use crate::error::{Error, ErrorKind::BadStatus};
use crate::header::{get_all_headers, get_header, Header, HeaderLine}; use crate::header::{get_all_headers, get_header, Header, HeaderLine};
use crate::pool::PoolReturnRead; use crate::pool::{PoolReturnRead, PoolReturner};
use crate::stream::{DeadlineStream, ReadOnlyStream, Stream}; use crate::stream::{DeadlineStream, ReadOnlyStream, Stream};
use crate::unit::Unit; use crate::unit::Unit;
use crate::{stream, Agent, ErrorKind}; use crate::{stream, Agent, ErrorKind};
@@ -327,7 +328,7 @@ impl Response {
let inner = stream.inner_ref(); let inner = stream.inner_ref();
let result = inner.set_read_timeout(unit.agent.config.timeout_read); let result = inner.set_read_timeout(unit.agent.config.timeout_read);
if let Err(e) = result { if let Err(e) = result {
return Box::new(ErrorReader(e)) as Box<dyn Read + Send + Sync + 'static>; return Box::new(ErrorReader(e));
} }
let buffer_len = inner.buffer().len(); let buffer_len = inner.buffer().len();
@@ -337,29 +338,40 @@ impl Response {
// to the connection pool. // to the connection pool.
BodyType::Chunked => { BodyType::Chunked => {
debug!("Chunked body in response"); debug!("Chunked body in response");
Box::new(PoolReturnRead::new( Box::new(PoolReturnRead::new(ChunkDecoder::new(stream)))
&unit.agent,
&unit.url,
ChunkDecoder::new(stream),
))
} }
// Responses with a content-length header means we should limit the reading // Responses with a content-length header means we should limit the reading
// of the body to the number of bytes in the header. Once done, we can // of the body to the number of bytes in the header. Once done, we can
// return the underlying stream to the connection pool. // return the underlying stream to the connection pool.
BodyType::LengthDelimited(len) => { BodyType::LengthDelimited(len) => {
let mut pooler = match NonZeroUsize::new(len) {
PoolReturnRead::new(&unit.agent, &unit.url, LimitedRead::new(stream, len)); None => {
debug!("zero-length body returning stream directly to pool");
let stream: Stream = stream.into();
// TODO: This expect can actually panic if we get an error when
// returning the stream to the pool. We reset the read timeouts
// when we do that, and since that's a syscall it can fail.
stream.return_to_pool().expect("returning stream to pool");
Box::new(std::io::empty())
}
Some(len) => {
let mut limited_read = LimitedRead::new(stream, len);
if len <= buffer_len { if len.get() <= buffer_len {
debug!("Body entirely buffered (length: {})", len); debug!("Body entirely buffered (length: {})", len);
let mut buf = vec![0; len]; let mut buf = vec![0; len.get()];
pooler // TODO: This expect can actually panic if we get an error when
// returning the stream to the pool. We reset the read timeouts
// when we do that, and since that's a syscall it can fail.
limited_read
.read_exact(&mut buf) .read_exact(&mut buf)
.expect("failed to read exact buffer length from stream"); .expect("failed to read exact buffer length from stream");
Box::new(Cursor::new(buf)) Box::new(Cursor::new(buf))
} else { } else {
debug!("Streaming body until content-length: {}", len); debug!("Streaming body until content-length: {}", len);
Box::new(pooler) Box::new(limited_read)
}
}
} }
} }
BodyType::CloseDelimited => { BodyType::CloseDelimited => {
@@ -698,7 +710,11 @@ impl FromStr for Response {
/// ``` /// ```
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
let remote_addr = "0.0.0.0:0".parse().unwrap(); let remote_addr = "0.0.0.0:0".parse().unwrap();
let stream = Stream::new(ReadOnlyStream::new(s.into()), remote_addr); let stream = Stream::new(
ReadOnlyStream::new(s.into()),
remote_addr,
PoolReturner::none(),
);
let request_url = "https://example.com".parse().unwrap(); let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader { let request_reader = SizedReader {
size: crate::body::BodySize::Empty, size: crate::body::BodySize::Empty,
@@ -763,16 +779,16 @@ fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<Header
/// Limits a `Read` to a content size (as set by a "Content-Length" header). /// Limits a `Read` to a content size (as set by a "Content-Length" header).
pub(crate) struct LimitedRead<R> { pub(crate) struct LimitedRead<R> {
reader: R, reader: Option<R>,
limit: usize, limit: usize,
position: usize, position: usize,
} }
impl<R: Read> LimitedRead<R> { impl<R: Read + Sized + Into<Stream>> LimitedRead<R> {
pub(crate) fn new(reader: R, limit: usize) -> Self { pub(crate) fn new(reader: R, limit: NonZeroUsize) -> Self {
LimitedRead { LimitedRead {
reader, reader: Some(reader),
limit, limit: limit.get(),
position: 0, position: 0,
} }
} }
@@ -780,9 +796,20 @@ impl<R: Read> LimitedRead<R> {
pub(crate) fn remaining(&self) -> usize { pub(crate) fn remaining(&self) -> usize {
self.limit - self.position self.limit - self.position
} }
fn return_stream_to_pool(&mut self) -> io::Result<()> {
if let Some(reader) = self.reader.take() {
// Convert back to a stream. If return_to_pool fails, the stream will
// drop and the connection will be closed.
let stream: Stream = reader.into();
stream.return_to_pool()?;
}
Ok(())
}
} }
impl<R: Read> Read for LimitedRead<R> { impl<R: Read + Sized + Into<Stream>> Read for LimitedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.remaining() == 0 { if self.remaining() == 0 {
return Ok(0); return Ok(0);
@@ -792,18 +819,27 @@ impl<R: Read> Read for LimitedRead<R> {
} else { } else {
buf buf
}; };
match self.reader.read(from) { let reader = match self.reader.as_mut() {
// If the reader has already been taken, return Ok(0) to all reads.
None => return Ok(0),
Some(r) => r,
};
match reader.read(from) {
// https://tools.ietf.org/html/rfc7230#page-33 // https://tools.ietf.org/html/rfc7230#page-33
// If the sender closes the connection or // If the sender closes the connection or
// the recipient times out before the indicated number of octets are // the recipient times out before the indicated number of octets are
// received, the recipient MUST consider the message to be // received, the recipient MUST consider the message to be
// incomplete and close the connection. // incomplete and close the connection.
// TODO: actually close the connection by dropping the stream
Ok(0) => Err(io::Error::new( Ok(0) => Err(io::Error::new(
io::ErrorKind::UnexpectedEof, io::ErrorKind::UnexpectedEof,
"response body closed before all bytes were read", "response body closed before all bytes were read",
)), )),
Ok(amount) => { Ok(amount) => {
self.position += amount; self.position += amount;
if self.remaining() == 0 {
self.return_stream_to_pool()?;
}
Ok(amount) Ok(amount)
} }
Err(e) => Err(e), Err(e) => Err(e),
@@ -811,15 +847,6 @@ impl<R: Read> Read for LimitedRead<R> {
} }
} }
impl<R: Read> From<LimitedRead<R>> for Stream
where
Stream: From<R>,
{
fn from(limited_read: LimitedRead<R>) -> Stream {
limited_read.reader.into()
}
}
/// Extract the charset from a "Content-Type" header. /// Extract the charset from a "Content-Type" header.
/// ///
/// "Content-Type: text/plain; charset=iso8859-1" -> "iso8859-1" /// "Content-Type: text/plain; charset=iso8859-1" -> "iso8859-1"
@@ -852,12 +879,20 @@ impl Read for ErrorReader {
mod tests { mod tests {
use std::io::Cursor; use std::io::Cursor;
use crate::{body::Payload, pool::PoolKey};
use super::*; use super::*;
#[test] #[test]
fn short_read() { fn short_read() {
use std::io::Cursor; use std::io::Cursor;
let mut lr = LimitedRead::new(Cursor::new(vec![b'a'; 3]), 10); let test_stream = crate::test::TestStream::new(Cursor::new(vec![b'a'; 3]), std::io::sink());
let stream = Stream::new(
test_stream,
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::none(),
);
let mut lr = LimitedRead::new(stream, std::num::NonZeroUsize::new(10).unwrap());
let mut buf = vec![0; 1000]; let mut buf = vec![0; 1000];
let result = lr.read_to_end(&mut buf); let result = lr.read_to_end(&mut buf);
assert!(result.err().unwrap().kind() == io::ErrorKind::UnexpectedEof); assert!(result.err().unwrap().kind() == io::ErrorKind::UnexpectedEof);
@@ -1062,6 +1097,7 @@ mod tests {
let s = Stream::new( let s = Stream::new(
ReadOnlyStream::new(v), ReadOnlyStream::new(v),
crate::stream::remote_addr_for_test(), crate::stream::remote_addr_for_test(),
PoolReturner::none(),
); );
let request_url = "https://example.com".parse().unwrap(); let request_url = "https://example.com".parse().unwrap();
let request_reader = SizedReader { let request_reader = SizedReader {
@@ -1112,4 +1148,39 @@ mod tests {
println!("Response size: {}", size); println!("Response size: {}", size);
assert!(size < 400); // 200 on Macbook M1 assert!(size < 400); // 200 on Macbook M1
} }
// Test that a stream gets returned to the pool immediately for a zero-length response, and
// that reads from the response's body consistently return Ok(0).
#[test]
fn zero_length_body_immediate_return() {
use std::io::Cursor;
let response_bytes = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"
.as_bytes()
.to_vec();
let test_stream =
crate::test::TestStream::new(Cursor::new(response_bytes), std::io::sink());
let agent = Agent::new();
let agent2 = agent.clone();
let stream = Stream::new(
test_stream,
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(
agent.clone(),
PoolKey::from_parts("https", "example.com", 443),
),
);
Response::do_from_stream(
stream,
Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
assert_eq!(agent2.state.pool.len(), 1);
}
} }

View File

@@ -11,6 +11,7 @@ use chunked_transfer::Decoder as ChunkDecoder;
#[cfg(feature = "socks-proxy")] #[cfg(feature = "socks-proxy")]
use socks::{TargetAddr, ToTargetAddr}; use socks::{TargetAddr, ToTargetAddr};
use crate::pool::{PoolKey, PoolReturner};
use crate::proxy::Proxy; use crate::proxy::Proxy;
use crate::{error::Error, proxy::Proto}; use crate::{error::Error, proxy::Proto};
@@ -40,6 +41,7 @@ pub(crate) struct Stream {
inner: BufReader<Box<dyn ReadWrite>>, inner: BufReader<Box<dyn ReadWrite>>,
/// The remote address the stream is connected to. /// The remote address the stream is connected to.
pub(crate) remote_addr: SocketAddr, pub(crate) remote_addr: SocketAddr,
pool_returner: PoolReturner,
} }
impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> { impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
@@ -179,10 +181,15 @@ impl fmt::Debug for Stream {
} }
impl Stream { impl Stream {
pub(crate) fn new(t: impl ReadWrite, remote_addr: SocketAddr) -> Stream { pub(crate) fn new(
t: impl ReadWrite,
remote_addr: SocketAddr,
pool_returner: PoolReturner,
) -> Stream {
Stream::logged_create(Stream { Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)), inner: BufReader::new(Box::new(t)),
remote_addr, remote_addr,
pool_returner,
}) })
} }
@@ -235,6 +242,13 @@ impl Stream {
} }
} }
pub(crate) fn return_to_pool(mut self) -> io::Result<()> {
// ensure stream can be reused
self.reset()?;
self.pool_returner.clone().return_to_pool(self);
Ok(())
}
pub(crate) fn reset(&mut self) -> io::Result<()> { pub(crate) fn reset(&mut self) -> io::Result<()> {
// When we are turning this back into a regular, non-deadline Stream, // When we are turning this back into a regular, non-deadline Stream,
// remove any timeouts we set. // remove any timeouts we set.
@@ -303,8 +317,9 @@ impl Drop for Stream {
pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> { pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
// //
let port = unit.url.port().unwrap_or(80); let port = unit.url.port().unwrap_or(80);
let pool_key = PoolKey::from_parts("http", hostname, port);
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r)) let pool_returner = PoolReturner::new(unit.agent.clone(), pool_key);
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r, pool_returner))
} }
pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> { pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
@@ -314,7 +329,9 @@ pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error
let tls_conf = &unit.agent.config.tls_config; let tls_conf = &unit.agent.config.tls_config;
let https_stream = tls_conf.connect(hostname, Box::new(sock))?; let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
Ok(Stream::new(https_stream, remote_addr)) let pool_key = PoolKey::from_parts("https", hostname, port);
let pool_returner = PoolReturner::new(unit.agent.clone(), pool_key);
Ok(Stream::new(https_stream, remote_addr, pool_returner))
} }
/// If successful, returns a `TcpStream` and the remote address it is connected to. /// If successful, returns a `TcpStream` and the remote address it is connected to.
@@ -671,7 +688,7 @@ mod tests {
let recorder = ReadRecorder { let recorder = ReadRecorder {
reads: reads.clone(), reads: reads.clone(),
}; };
let stream = Stream::new(recorder, remote_addr_for_test()); let stream = Stream::new(recorder, remote_addr_for_test(), PoolReturner::none());
let mut deadline_stream = DeadlineStream::new(stream, None); let mut deadline_stream = DeadlineStream::new(stream, None);
let mut buf = [0u8; 1]; let mut buf = [0u8; 1];
for _ in 0..8193 { for _ in 0..8193 {

View File

@@ -1,4 +1,5 @@
use crate::error::Error; use crate::error::Error;
use crate::pool::PoolReturner;
use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream}; use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream};
use crate::unit::Unit; use crate::unit::Unit;
use crate::ReadWrite; use crate::ReadWrite;
@@ -55,6 +56,7 @@ pub(crate) fn make_response(
Ok(Stream::new( Ok(Stream::new(
ReadOnlyStream::new(buf), ReadOnlyStream::new(buf),
remote_addr_for_test(), remote_addr_for_test(),
PoolReturner::none(),
)) ))
} }
@@ -103,6 +105,7 @@ impl Recorder {
Stream::new( Stream::new(
TestStream::new(cursor, self.clone()), TestStream::new(cursor, self.clone()),
remote_addr_for_test(), remote_addr_for_test(),
PoolReturner::none(),
) )
} }
} }