@@ -302,6 +302,7 @@ impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use crate::stream::remote_addr_for_test;
|
||||||
use crate::ReadWrite;
|
use crate::ReadWrite;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -311,7 +312,7 @@ mod tests {
|
|||||||
|
|
||||||
impl NoopStream {
|
impl NoopStream {
|
||||||
fn stream() -> Stream {
|
fn stream() -> Stream {
|
||||||
Stream::new(NoopStream)
|
Stream::new(NoopStream, remote_addr_for_test())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::io::{self, Cursor, Read};
|
use std::io::{self, Cursor, Read};
|
||||||
|
use std::net::SocketAddr;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::{fmt, io::BufRead};
|
use std::{fmt, io::BufRead};
|
||||||
|
|
||||||
@@ -69,6 +70,8 @@ pub struct Response {
|
|||||||
// Boxed to avoid taking up too much size.
|
// Boxed to avoid taking up too much size.
|
||||||
unit: Box<Unit>,
|
unit: Box<Unit>,
|
||||||
reader: Box<dyn Read + Send + Sync + 'static>,
|
reader: Box<dyn Read + Send + Sync + 'static>,
|
||||||
|
/// The socket address of the server that sent the response.
|
||||||
|
remote_addr: SocketAddr,
|
||||||
/// The redirect history of this response, if any. The history starts with
|
/// The redirect history of this response, if any. The history starts with
|
||||||
/// the first response received and ends with the response immediately
|
/// the first response received and ends with the response immediately
|
||||||
/// previous to this one.
|
/// previous to this one.
|
||||||
@@ -223,6 +226,11 @@ impl Response {
|
|||||||
charset_from_content_type(self.header("content-type"))
|
charset_from_content_type(self.header("content-type"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The socket address of the server that sent the response.
|
||||||
|
pub fn remote_addr(&self) -> SocketAddr {
|
||||||
|
self.remote_addr
|
||||||
|
}
|
||||||
|
|
||||||
/// Turn this response into a `impl Read` of the body.
|
/// Turn this response into a `impl Read` of the body.
|
||||||
///
|
///
|
||||||
/// 1. If `Transfer-Encoding: chunked`, the returned reader will unchunk it
|
/// 1. If `Transfer-Encoding: chunked`, the returned reader will unchunk it
|
||||||
@@ -495,6 +503,7 @@ impl Response {
|
|||||||
///
|
///
|
||||||
/// assert_eq!(resp.status(), 401);
|
/// assert_eq!(resp.status(), 401);
|
||||||
pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result<Response, Error> {
|
pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result<Response, Error> {
|
||||||
|
let remote_addr = stream.remote_addr;
|
||||||
//
|
//
|
||||||
// HTTP/1.1 200 OK\r\n
|
// HTTP/1.1 200 OK\r\n
|
||||||
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
|
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
|
||||||
@@ -540,6 +549,7 @@ impl Response {
|
|||||||
headers,
|
headers,
|
||||||
unit: Box::new(unit),
|
unit: Box::new(unit),
|
||||||
reader: Box::new(Cursor::new(vec![])),
|
reader: Box::new(Cursor::new(vec![])),
|
||||||
|
remote_addr,
|
||||||
history: vec![],
|
history: vec![],
|
||||||
length,
|
length,
|
||||||
compression,
|
compression,
|
||||||
@@ -668,7 +678,8 @@ impl FromStr for Response {
|
|||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
let stream = Stream::new(ReadOnlyStream::new(s.into()));
|
let remote_addr = "0.0.0.0:0".parse().unwrap();
|
||||||
|
let stream = Stream::new(ReadOnlyStream::new(s.into()), remote_addr);
|
||||||
let request_url = "https://example.com".parse().unwrap();
|
let request_url = "https://example.com".parse().unwrap();
|
||||||
let request_reader = SizedReader {
|
let request_reader = SizedReader {
|
||||||
size: crate::body::BodySize::Empty,
|
size: crate::body::BodySize::Empty,
|
||||||
@@ -1029,7 +1040,10 @@ mod tests {
|
|||||||
OK",
|
OK",
|
||||||
);
|
);
|
||||||
let v = cow.to_vec();
|
let v = cow.to_vec();
|
||||||
let s = Stream::new(ReadOnlyStream::new(v));
|
let s = Stream::new(
|
||||||
|
ReadOnlyStream::new(v),
|
||||||
|
crate::stream::remote_addr_for_test(),
|
||||||
|
);
|
||||||
let request_url = "https://example.com".parse().unwrap();
|
let request_url = "https://example.com".parse().unwrap();
|
||||||
let request_reader = SizedReader {
|
let request_reader = SizedReader {
|
||||||
size: crate::body::BodySize::Empty,
|
size: crate::body::BodySize::Empty,
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ pub trait TlsConnector: Send + Sync {
|
|||||||
|
|
||||||
pub(crate) struct Stream {
|
pub(crate) struct Stream {
|
||||||
inner: BufReader<Box<dyn ReadWrite>>,
|
inner: BufReader<Box<dyn ReadWrite>>,
|
||||||
|
/// The remote address the stream is connected to.
|
||||||
|
pub(crate) remote_addr: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
|
impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
|
||||||
@@ -177,9 +179,10 @@ impl fmt::Debug for Stream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Stream {
|
impl Stream {
|
||||||
pub(crate) fn new(t: impl ReadWrite) -> Stream {
|
pub(crate) fn new(t: impl ReadWrite, remote_addr: SocketAddr) -> Stream {
|
||||||
Stream::logged_create(Stream {
|
Stream::logged_create(Stream {
|
||||||
inner: BufReader::new(Box::new(t)),
|
inner: BufReader::new(Box::new(t)),
|
||||||
|
remote_addr,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,12 +195,6 @@ impl Stream {
|
|||||||
self.inner.buffer()
|
self.inner.buffer()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_tcp_stream(t: TcpStream) -> Stream {
|
|
||||||
Stream::logged_create(Stream {
|
|
||||||
inner: BufReader::new(Box::new(t)),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the server has closed a stream by performing a one-byte
|
// Check if the server has closed a stream by performing a one-byte
|
||||||
// non-blocking read. If this returns EOF, the server has closed the
|
// non-blocking read. If this returns EOF, the server has closed the
|
||||||
// connection: return true. If this returns a successful read, there are
|
// connection: return true. If this returns a successful read, there are
|
||||||
@@ -307,20 +304,25 @@ pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error>
|
|||||||
//
|
//
|
||||||
let port = unit.url.port().unwrap_or(80);
|
let port = unit.url.port().unwrap_or(80);
|
||||||
|
|
||||||
connect_host(unit, hostname, port).map(Stream::from_tcp_stream)
|
connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
|
pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
|
||||||
let port = unit.url.port().unwrap_or(443);
|
let port = unit.url.port().unwrap_or(443);
|
||||||
|
|
||||||
let sock = connect_host(unit, hostname, port)?;
|
let (sock, remote_addr) = connect_host(unit, hostname, port)?;
|
||||||
|
|
||||||
let tls_conf = &unit.agent.config.tls_config;
|
let tls_conf = &unit.agent.config.tls_config;
|
||||||
let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
|
let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
|
||||||
Ok(Stream::new(https_stream))
|
Ok(Stream::new(https_stream, remote_addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<TcpStream, Error> {
|
/// If successful, returns a `TcpStream` and the remote address it is connected to.
|
||||||
|
pub(crate) fn connect_host(
|
||||||
|
unit: &Unit,
|
||||||
|
hostname: &str,
|
||||||
|
port: u16,
|
||||||
|
) -> Result<(TcpStream, SocketAddr), Error> {
|
||||||
let connect_deadline: Option<Instant> =
|
let connect_deadline: Option<Instant> =
|
||||||
if let Some(timeout_connect) = unit.agent.config.timeout_connect {
|
if let Some(timeout_connect) = unit.agent.config.timeout_connect {
|
||||||
Instant::now().checked_add(timeout_connect)
|
Instant::now().checked_add(timeout_connect)
|
||||||
@@ -347,7 +349,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
let proto = proxy.as_ref().map(|proxy| proxy.proto);
|
let proto = proxy.as_ref().map(|proxy| proxy.proto);
|
||||||
|
|
||||||
let mut any_err = None;
|
let mut any_err = None;
|
||||||
let mut any_stream = None;
|
let mut any_stream_and_addr = None;
|
||||||
// Find the first sock_addr that accepts a connection
|
// Find the first sock_addr that accepts a connection
|
||||||
for sock_addr in sock_addrs {
|
for sock_addr in sock_addrs {
|
||||||
// ensure connect timeout or overall timeout aren't yet hit.
|
// ensure connect timeout or overall timeout aren't yet hit.
|
||||||
@@ -376,15 +378,15 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Ok(stream) = stream {
|
if let Ok(stream) = stream {
|
||||||
any_stream = Some(stream);
|
any_stream_and_addr = Some((stream, sock_addr));
|
||||||
break;
|
break;
|
||||||
} else if let Err(err) = stream {
|
} else if let Err(err) = stream {
|
||||||
any_err = Some(err);
|
any_err = Some(err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut stream = if let Some(stream) = any_stream {
|
let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
|
||||||
stream
|
stream_and_addr
|
||||||
} else if let Some(e) = any_err {
|
} else if let Some(e) = any_err {
|
||||||
return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
|
return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
|
||||||
} else {
|
} else {
|
||||||
@@ -425,7 +427,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result<Tcp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(stream)
|
Ok((stream, remote_addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "socks-proxy")]
|
#[cfg(feature = "socks-proxy")]
|
||||||
@@ -611,6 +613,12 @@ pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
|
|||||||
Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
|
Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn remote_addr_for_test() -> SocketAddr {
|
||||||
|
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||||
|
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -662,7 +670,7 @@ mod tests {
|
|||||||
let recorder = ReadRecorder {
|
let recorder = ReadRecorder {
|
||||||
reads: reads.clone(),
|
reads: reads.clone(),
|
||||||
};
|
};
|
||||||
let stream = Stream::new(recorder);
|
let stream = Stream::new(recorder, remote_addr_for_test());
|
||||||
let mut deadline_stream = DeadlineStream::new(stream, None);
|
let mut deadline_stream = DeadlineStream::new(stream, None);
|
||||||
let mut buf = [0u8; 1];
|
let mut buf = [0u8; 1];
|
||||||
for _ in 0..8193 {
|
for _ in 0..8193 {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::stream::{ReadOnlyStream, Stream};
|
use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream};
|
||||||
use crate::unit::Unit;
|
use crate::unit::Unit;
|
||||||
use crate::ReadWrite;
|
use crate::ReadWrite;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
@@ -52,7 +52,10 @@ pub(crate) fn make_response(
|
|||||||
}
|
}
|
||||||
write!(&mut buf, "\r\n").ok();
|
write!(&mut buf, "\r\n").ok();
|
||||||
buf.append(&mut body);
|
buf.append(&mut body);
|
||||||
Ok(Stream::new(ReadOnlyStream::new(buf)))
|
Ok(Stream::new(
|
||||||
|
ReadOnlyStream::new(buf),
|
||||||
|
remote_addr_for_test(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn resolve_handler(unit: &Unit) -> Result<Stream, Error> {
|
pub(crate) fn resolve_handler(unit: &Unit) -> Result<Stream, Error> {
|
||||||
@@ -97,7 +100,10 @@ impl Recorder {
|
|||||||
|
|
||||||
fn stream(&self) -> Stream {
|
fn stream(&self) -> Stream {
|
||||||
let cursor = Cursor::new(b"HTTP/1.1 200 OK\r\n\r\n");
|
let cursor = Cursor::new(b"HTTP/1.1 200 OK\r\n\r\n");
|
||||||
Stream::new(TestStream::new(cursor, self.clone()))
|
Stream::new(
|
||||||
|
TestStream::new(cursor, self.clone()),
|
||||||
|
remote_addr_for_test(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user