Buffer short responses (2) (#531)

* Response: build reader at construction time

* Remove unwrapping/rewrapping DeadlineStream

* Let into_reader() provide Sync + 'static

* Prebuffer Vec use known capacity

Co-authored-by: Martin Algesten <martin@algesten.se>
This commit is contained in:
Jacob Hoffman-Andrews
2022-07-10 02:12:50 -07:00
committed by GitHub
parent 9908c446d6
commit f14fc45179
2 changed files with 44 additions and 21 deletions

View File

@@ -1,4 +1,4 @@
use std::io::{self, Read}; use std::io::{self, Cursor, Read};
use std::str::FromStr; use std::str::FromStr;
use std::{fmt, io::BufRead}; use std::{fmt, io::BufRead};
@@ -67,8 +67,7 @@ pub struct Response {
headers: Vec<Header>, headers: Vec<Header>,
// Boxed to avoid taking up too much size. // Boxed to avoid taking up too much size.
unit: Box<Unit>, unit: Box<Unit>,
// Boxed to avoid taking up too much size. reader: Box<dyn Read + Send + Sync + 'static>,
stream: Box<Stream>,
/// The redirect history of this response, if any. The history starts with /// The redirect history of this response, if any. The history starts with
/// the first response received and ends with the response immediately /// the first response received and ends with the response immediately
/// previous to this one. /// previous to this one.
@@ -259,7 +258,11 @@ impl Response {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub fn into_reader(self) -> impl Read + Send { pub fn into_reader(self) -> Box<dyn Read + Send + Sync + 'static> {
self.reader
}
fn stream_to_reader(&self, stream: DeadlineStream) -> Box<dyn Read + Send + Sync + 'static> {
// //
let is_http10 = self.http_version().eq_ignore_ascii_case("HTTP/1.0"); let is_http10 = self.http_version().eq_ignore_ascii_case("HTTP/1.0");
let is_close = self let is_close = self
@@ -290,26 +293,33 @@ impl Response {
self.length self.length
}; };
let stream = self.stream; let unit = &self.unit;
let unit = self.unit; let inner = stream.inner_ref();
let result = stream.set_read_timeout(unit.agent.config.timeout_read); let result = inner.set_read_timeout(unit.agent.config.timeout_read);
if let Err(e) = result { if let Err(e) = result {
return Box::new(ErrorReader(e)) as Box<dyn Read + Send>; return Box::new(ErrorReader(e)) as Box<dyn Read + Send + Sync + 'static>;
} }
let deadline = unit.deadline; let buffer_len = inner.buffer().len();
let stream = DeadlineStream::new(*stream, deadline);
let body_reader: Box<dyn Read + Send> = match (use_chunked, limit_bytes) { let body_reader: Box<dyn Read + Send + Sync> = match (use_chunked, limit_bytes) {
(true, _) => Box::new(PoolReturnRead::new( (true, _) => Box::new(PoolReturnRead::new(
&unit.agent, &unit.agent,
&unit.url, &unit.url,
ChunkDecoder::new(stream), ChunkDecoder::new(stream),
)), )),
(false, Some(len)) => Box::new(PoolReturnRead::new( (false, Some(len)) => {
&unit.agent, let mut pooler =
&unit.url, PoolReturnRead::new(&unit.agent, &unit.url, LimitedRead::new(stream, len));
LimitedRead::new(stream, len), if len <= buffer_len {
)), let mut buf = vec![0; len];
pooler
.read_exact(&mut buf)
.expect("failed to read exact buffer length from stream");
Box::new(Cursor::new(buf))
} else {
Box::new(pooler)
}
}
(false, None) => Box::new(stream), (false, None) => Box::new(stream),
}; };
@@ -506,18 +516,20 @@ impl Response {
let url = unit.url.clone(); let url = unit.url.clone();
Ok(Response { let mut response = Response {
url, url,
status_line, status_line,
index, index,
status, status,
headers, headers,
unit: Box::new(unit), unit: Box::new(unit),
stream: Box::new(stream.into()), reader: Box::new(Cursor::new(vec![])),
history: vec![], history: vec![],
length, length,
compression, compression,
}) };
response.reader = response.stream_to_reader(stream);
Ok(response)
} }
#[cfg(test)] #[cfg(test)]
@@ -554,7 +566,10 @@ impl Compression {
/// Wrap the raw reader with a decompressing reader /// Wrap the raw reader with a decompressing reader
#[allow(unused_variables)] // when no features enabled, reader is unused (unreachable) #[allow(unused_variables)] // when no features enabled, reader is unused (unreachable)
fn wrap_reader(self, reader: Box<dyn Read + Send>) -> Box<dyn Read + Send> { fn wrap_reader(
self,
reader: Box<dyn Read + Send + Sync + 'static>,
) -> Box<dyn Read + Send + Sync + 'static> {
match self { match self {
#[cfg(feature = "brotli")] #[cfg(feature = "brotli")]
Compression::Brotli => Box::new(BrotliDecoder::new(reader, 4096)), Compression::Brotli => Box::new(BrotliDecoder::new(reader, 4096)),

View File

@@ -50,7 +50,7 @@ impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
// after the provided deadline, and sets timeouts on the underlying // after the provided deadline, and sets timeouts on the underlying
// TcpStream to ensure read() doesn't block beyond the deadline. // TcpStream to ensure read() doesn't block beyond the deadline.
// When the From trait is used to turn a DeadlineStream back into a // When the From trait is used to turn a DeadlineStream back into a
// Stream (by PoolReturningRead), the timeouts are removed. // Stream (by PoolReturnRead), the timeouts are removed.
pub(crate) struct DeadlineStream { pub(crate) struct DeadlineStream {
stream: Stream, stream: Stream,
deadline: Option<Instant>, deadline: Option<Instant>,
@@ -60,6 +60,10 @@ impl DeadlineStream {
pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self { pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self {
DeadlineStream { stream, deadline } DeadlineStream { stream, deadline }
} }
pub(crate) fn inner_ref(&self) -> &Stream {
&self.stream
}
} }
impl From<DeadlineStream> for Stream { impl From<DeadlineStream> for Stream {
@@ -184,6 +188,10 @@ impl Stream {
stream stream
} }
pub(crate) fn buffer(&self) -> &[u8] {
self.inner.buffer()
}
fn from_tcp_stream(t: TcpStream) -> Stream { fn from_tcp_stream(t: TcpStream) -> Stream {
Stream::logged_create(Stream { Stream::logged_create(Stream {
inner: BufReader::new(Box::new(t)), inner: BufReader::new(Box::new(t)),