Split response body type into own function (#566)
This allows removing the hack where we create a Response with an empty `reader`, then immediately mutate it to set the real reader. It also happens to allow us to get rid of 3 fields of Response that were only used to pass information to `stream_to_reader`. I've tried to keep the structure and logic of the body calculation as close to the same as possible, for ease of review and to avoid introducing bugs. I think there are some followup fixes we can make to the logic, which will be made easier by having it in a self contained function.
This commit is contained in:
committed by
GitHub
parent
cf687381bd
commit
d8225b22ed
@@ -150,14 +150,14 @@ impl Header {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_header<'a, 'b>(headers: &'b [Header], name: &'a str) -> Option<&'b str> {
|
pub fn get_header<'h>(headers: &'h [Header], name: &str) -> Option<&'h str> {
|
||||||
headers
|
headers
|
||||||
.iter()
|
.iter()
|
||||||
.find(|h| h.is_name(name))
|
.find(|h| h.is_name(name))
|
||||||
.and_then(|h| h.value())
|
.and_then(|h| h.value())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_all_headers<'a, 'b>(headers: &'b [Header], name: &'a str) -> Vec<&'b str> {
|
pub fn get_all_headers<'h>(headers: &'h [Header], name: &str) -> Vec<&'h str> {
|
||||||
headers
|
headers
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|h| h.is_name(name))
|
.filter(|h| h.is_name(name))
|
||||||
|
|||||||
@@ -35,6 +35,13 @@ const INTO_STRING_LIMIT: usize = 10 * 1_024 * 1_024;
|
|||||||
const MAX_HEADER_SIZE: usize = 100 * 1_024;
|
const MAX_HEADER_SIZE: usize = 100 * 1_024;
|
||||||
const MAX_HEADER_COUNT: usize = 100;
|
const MAX_HEADER_COUNT: usize = 100;
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
enum BodyType {
|
||||||
|
LengthDelimited(usize),
|
||||||
|
Chunked,
|
||||||
|
CloseDelimited,
|
||||||
|
}
|
||||||
|
|
||||||
/// Response instances are created as results of firing off requests.
|
/// Response instances are created as results of firing off requests.
|
||||||
///
|
///
|
||||||
/// The `Response` is used to read response headers and decide what to do with the body.
|
/// The `Response` is used to read response headers and decide what to do with the body.
|
||||||
@@ -67,8 +74,6 @@ pub struct Response {
|
|||||||
index: ResponseStatusIndex,
|
index: ResponseStatusIndex,
|
||||||
status: u16,
|
status: u16,
|
||||||
headers: Vec<Header>,
|
headers: Vec<Header>,
|
||||||
// Boxed to avoid taking up too much size.
|
|
||||||
unit: Box<Unit>,
|
|
||||||
reader: Box<dyn Read + Send + Sync + 'static>,
|
reader: Box<dyn Read + Send + Sync + 'static>,
|
||||||
/// The socket address of the server that sent the response.
|
/// The socket address of the server that sent the response.
|
||||||
remote_addr: SocketAddr,
|
remote_addr: SocketAddr,
|
||||||
@@ -78,11 +83,6 @@ pub struct Response {
|
|||||||
///
|
///
|
||||||
/// If this response was not redirected, the history is empty.
|
/// If this response was not redirected, the history is empty.
|
||||||
pub(crate) history: Vec<Url>,
|
pub(crate) history: Vec<Url>,
|
||||||
/// The Content-Length value. The header itself may have been removed due to
|
|
||||||
/// the automatic decompression system.
|
|
||||||
length: Option<usize>,
|
|
||||||
/// The compression type of the response body.
|
|
||||||
compression: Option<Compression>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// index into status_line where we split: HTTP/1.1 200 OK
|
/// index into status_line where we split: HTTP/1.1 200 OK
|
||||||
@@ -271,38 +271,59 @@ impl Response {
|
|||||||
self.reader
|
self.reader
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stream_to_reader(&self, stream: DeadlineStream) -> Box<dyn Read + Send + Sync + 'static> {
|
fn body_type(
|
||||||
//
|
request_method: &str,
|
||||||
let is_http10 = self.http_version().eq_ignore_ascii_case("HTTP/1.0");
|
response_status: u16,
|
||||||
let is_close = self
|
response_version: &str,
|
||||||
.header("connection")
|
headers: &[Header],
|
||||||
|
) -> BodyType {
|
||||||
|
let is_http10 = response_version.eq_ignore_ascii_case("HTTP/1.0");
|
||||||
|
let is_close = get_header(headers, "connection")
|
||||||
.map(|c| c.eq_ignore_ascii_case("close"))
|
.map(|c| c.eq_ignore_ascii_case("close"))
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
let is_head = self.unit.is_head();
|
let is_head = request_method.eq_ignore_ascii_case("head");
|
||||||
let has_no_body = is_head
|
let has_no_body = is_head
|
||||||
|| match self.status {
|
|| match response_status {
|
||||||
204 | 304 => true,
|
204 | 304 => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
|
|
||||||
let is_chunked = self
|
let is_chunked = get_header(headers, "transfer-encoding")
|
||||||
.header("transfer-encoding")
|
|
||||||
.map(|enc| !enc.is_empty()) // whatever it says, do chunked
|
.map(|enc| !enc.is_empty()) // whatever it says, do chunked
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
let use_chunked = !is_http10 && !has_no_body && is_chunked;
|
let use_chunked = !is_http10 && !has_no_body && is_chunked;
|
||||||
|
|
||||||
let limit_bytes = if is_http10 || is_close {
|
if use_chunked {
|
||||||
None
|
return BodyType::Chunked;
|
||||||
|
}
|
||||||
|
|
||||||
|
if has_no_body {
|
||||||
|
return BodyType::LengthDelimited(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let length = get_header(headers, "content-length").and_then(|v| v.parse::<usize>().ok());
|
||||||
|
|
||||||
|
if is_http10 || is_close {
|
||||||
|
BodyType::CloseDelimited
|
||||||
} else if has_no_body {
|
} else if has_no_body {
|
||||||
// head requests never have a body
|
// head requests never have a body
|
||||||
Some(0)
|
BodyType::LengthDelimited(0)
|
||||||
} else {
|
} else {
|
||||||
self.length
|
match length {
|
||||||
};
|
Some(n) => BodyType::LengthDelimited(n),
|
||||||
|
None => BodyType::CloseDelimited,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let unit = &self.unit;
|
fn stream_to_reader(
|
||||||
|
stream: DeadlineStream,
|
||||||
|
unit: &Unit,
|
||||||
|
body_type: BodyType,
|
||||||
|
compression: Option<Compression>,
|
||||||
|
) -> Box<dyn Read + Send + Sync + 'static> {
|
||||||
let inner = stream.inner_ref();
|
let inner = stream.inner_ref();
|
||||||
let result = inner.set_read_timeout(unit.agent.config.timeout_read);
|
let result = inner.set_read_timeout(unit.agent.config.timeout_read);
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
@@ -310,11 +331,11 @@ impl Response {
|
|||||||
}
|
}
|
||||||
let buffer_len = inner.buffer().len();
|
let buffer_len = inner.buffer().len();
|
||||||
|
|
||||||
let body_reader: Box<dyn Read + Send + Sync> = match (use_chunked, limit_bytes) {
|
let body_reader: Box<dyn Read + Send + Sync> = match body_type {
|
||||||
// Chunked responses have an unknown length, but do have an end of body
|
// Chunked responses have an unknown length, but do have an end of body
|
||||||
// marker. When we encounter the marker, we can return the underlying stream
|
// marker. When we encounter the marker, we can return the underlying stream
|
||||||
// to the connection pool.
|
// to the connection pool.
|
||||||
(true, _) => {
|
BodyType::Chunked => {
|
||||||
debug!("Chunked body in response");
|
debug!("Chunked body in response");
|
||||||
Box::new(PoolReturnRead::new(
|
Box::new(PoolReturnRead::new(
|
||||||
&unit.agent,
|
&unit.agent,
|
||||||
@@ -325,7 +346,7 @@ impl Response {
|
|||||||
// Responses with a content-length header means we should limit the reading
|
// Responses with a content-length header means we should limit the reading
|
||||||
// of the body to the number of bytes in the header. Once done, we can
|
// of the body to the number of bytes in the header. Once done, we can
|
||||||
// return the underlying stream to the connection pool.
|
// return the underlying stream to the connection pool.
|
||||||
(false, Some(len)) => {
|
BodyType::LengthDelimited(len) => {
|
||||||
let mut pooler =
|
let mut pooler =
|
||||||
PoolReturnRead::new(&unit.agent, &unit.url, LimitedRead::new(stream, len));
|
PoolReturnRead::new(&unit.agent, &unit.url, LimitedRead::new(stream, len));
|
||||||
|
|
||||||
@@ -341,13 +362,13 @@ impl Response {
|
|||||||
Box::new(pooler)
|
Box::new(pooler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(false, None) => {
|
BodyType::CloseDelimited => {
|
||||||
debug!("Body of unknown size - read until socket close");
|
debug!("Body of unknown size - read until socket close");
|
||||||
Box::new(stream)
|
Box::new(stream)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.compression {
|
match compression {
|
||||||
None => body_reader,
|
None => body_reader,
|
||||||
Some(c) => c.wrap_reader(body_reader),
|
Some(c) => c.wrap_reader(body_reader),
|
||||||
}
|
}
|
||||||
@@ -511,6 +532,7 @@ impl Response {
|
|||||||
// The status line we can ignore non-utf8 chars and parse as_str_lossy().
|
// The status line we can ignore non-utf8 chars and parse as_str_lossy().
|
||||||
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
|
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
|
||||||
let (index, status) = parse_status_line(status_line.as_str())?;
|
let (index, status) = parse_status_line(status_line.as_str())?;
|
||||||
|
let http_version = &status_line.as_str()[0..index.http_version];
|
||||||
|
|
||||||
let mut headers: Vec<Header> = Vec::new();
|
let mut headers: Vec<Header> = Vec::new();
|
||||||
while headers.len() <= MAX_HEADER_COUNT {
|
while headers.len() <= MAX_HEADER_COUNT {
|
||||||
@@ -529,8 +551,6 @@ impl Response {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let length = get_header(&headers, "content-length").and_then(|v| v.parse::<usize>().ok());
|
|
||||||
|
|
||||||
let compression =
|
let compression =
|
||||||
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);
|
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);
|
||||||
|
|
||||||
@@ -539,22 +559,21 @@ impl Response {
|
|||||||
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
|
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let body_type = Self::body_type(&unit.method, status, http_version, &headers);
|
||||||
|
let reader = Self::stream_to_reader(stream, &unit, body_type, compression);
|
||||||
|
|
||||||
let url = unit.url.clone();
|
let url = unit.url.clone();
|
||||||
|
|
||||||
let mut response = Response {
|
let response = Response {
|
||||||
url,
|
url,
|
||||||
status_line,
|
status_line,
|
||||||
index,
|
index,
|
||||||
status,
|
status,
|
||||||
headers,
|
headers,
|
||||||
unit: Box::new(unit),
|
reader,
|
||||||
reader: Box::new(Cursor::new(vec![])),
|
|
||||||
remote_addr,
|
remote_addr,
|
||||||
history: vec![],
|
history: vec![],
|
||||||
length,
|
|
||||||
compression,
|
|
||||||
};
|
};
|
||||||
response.reader = response.stream_to_reader(stream);
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -109,10 +109,6 @@ impl Unit {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_head(&self) -> bool {
|
|
||||||
self.method.eq_ignore_ascii_case("head")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn resolver(&self) -> ArcResolver {
|
pub fn resolver(&self) -> ArcResolver {
|
||||||
self.agent.state.resolver.clone()
|
self.agent.state.resolver.clone()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user