diff --git a/src/response.rs b/src/response.rs index b108897..15ccda0 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,4 +1,4 @@ -use std::io::{self, Read}; +use std::io::{self, Cursor, Read}; use std::str::FromStr; use std::{fmt, io::BufRead}; @@ -67,8 +67,7 @@ pub struct Response { headers: Vec
, // Boxed to avoid taking up too much size. unit: Box, - // Boxed to avoid taking up too much size. - stream: Box, + reader: Box, /// The redirect history of this response, if any. The history starts with /// the first response received and ends with the response immediately /// previous to this one. @@ -259,7 +258,11 @@ impl Response { /// # Ok(()) /// # } /// ``` - pub fn into_reader(self) -> impl Read + Send { + pub fn into_reader(self) -> Box { + self.reader + } + + fn stream_to_reader(&self, stream: DeadlineStream) -> Box { // let is_http10 = self.http_version().eq_ignore_ascii_case("HTTP/1.0"); let is_close = self @@ -290,26 +293,33 @@ impl Response { self.length }; - let stream = self.stream; - let unit = self.unit; - let result = stream.set_read_timeout(unit.agent.config.timeout_read); + let unit = &self.unit; + let inner = stream.inner_ref(); + let result = inner.set_read_timeout(unit.agent.config.timeout_read); if let Err(e) = result { - return Box::new(ErrorReader(e)) as Box; + return Box::new(ErrorReader(e)) as Box; } - let deadline = unit.deadline; - let stream = DeadlineStream::new(*stream, deadline); + let buffer_len = inner.buffer().len(); - let body_reader: Box = match (use_chunked, limit_bytes) { + let body_reader: Box = match (use_chunked, limit_bytes) { (true, _) => Box::new(PoolReturnRead::new( &unit.agent, &unit.url, ChunkDecoder::new(stream), )), - (false, Some(len)) => Box::new(PoolReturnRead::new( - &unit.agent, - &unit.url, - LimitedRead::new(stream, len), - )), + (false, Some(len)) => { + let mut pooler = + PoolReturnRead::new(&unit.agent, &unit.url, LimitedRead::new(stream, len)); + if len <= buffer_len { + let mut buf = vec![0; len]; + pooler + .read_exact(&mut buf) + .expect("failed to read exact buffer length from stream"); + Box::new(Cursor::new(buf)) + } else { + Box::new(pooler) + } + } (false, None) => Box::new(stream), }; @@ -506,18 +516,20 @@ impl Response { let url = unit.url.clone(); - Ok(Response { + let mut response = Response { url, status_line, index, status, headers, unit: Box::new(unit), - stream: Box::new(stream.into()), + reader: Box::new(Cursor::new(vec![])), history: vec![], length, compression, - }) + }; + response.reader = response.stream_to_reader(stream); + Ok(response) } #[cfg(test)] @@ -554,7 +566,10 @@ impl Compression { /// Wrap the raw reader with a decompressing reader #[allow(unused_variables)] // when no features enabled, reader is unused (unreachable) - fn wrap_reader(self, reader: Box) -> Box { + fn wrap_reader( + self, + reader: Box, + ) -> Box { match self { #[cfg(feature = "brotli")] Compression::Brotli => Box::new(BrotliDecoder::new(reader, 4096)), diff --git a/src/stream.rs b/src/stream.rs index dadd98b..6497511 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -50,7 +50,7 @@ impl ReadWrite for Box { // after the provided deadline, and sets timeouts on the underlying // TcpStream to ensure read() doesn't block beyond the deadline. // When the From trait is used to turn a DeadlineStream back into a -// Stream (by PoolReturningRead), the timeouts are removed. +// Stream (by PoolReturnRead), the timeouts are removed. pub(crate) struct DeadlineStream { stream: Stream, deadline: Option, @@ -60,6 +60,10 @@ impl DeadlineStream { pub(crate) fn new(stream: Stream, deadline: Option) -> Self { DeadlineStream { stream, deadline } } + + pub(crate) fn inner_ref(&self) -> &Stream { + &self.stream + } } impl From for Stream { @@ -184,6 +188,10 @@ impl Stream { stream } + pub(crate) fn buffer(&self) -> &[u8] { + self.inner.buffer() + } + fn from_tcp_stream(t: TcpStream) -> Stream { Stream::logged_create(Stream { inner: BufReader::new(Box::new(t)),