From 855f20e6623471693cf67dfabb52cfdf212d496e Mon Sep 17 00:00:00 2001 From: Max von Forell Date: Mon, 3 Oct 2022 22:29:21 +0200 Subject: [PATCH] Add Response::remote_addr() (#489) Fixes #488. --- src/pool.rs | 3 ++- src/response.rs | 18 ++++++++++++++++-- src/stream.rs | 42 +++++++++++++++++++++++++----------------- src/test/mod.rs | 12 +++++++++--- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 7ea99e8..6b50ebd 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -302,6 +302,7 @@ impl> Read for PoolReturnRead { #[cfg(test)] mod tests { + use crate::stream::remote_addr_for_test; use crate::ReadWrite; use super::*; @@ -311,7 +312,7 @@ mod tests { impl NoopStream { fn stream() -> Stream { - Stream::new(NoopStream) + Stream::new(NoopStream, remote_addr_for_test()) } } diff --git a/src/response.rs b/src/response.rs index 65a81af..7f66744 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,4 +1,5 @@ use std::io::{self, Cursor, Read}; +use std::net::SocketAddr; use std::str::FromStr; use std::{fmt, io::BufRead}; @@ -69,6 +70,8 @@ pub struct Response { // Boxed to avoid taking up too much size. unit: Box, reader: Box, + /// The socket address of the server that sent the response. + remote_addr: SocketAddr, /// The redirect history of this response, if any. The history starts with /// the first response received and ends with the response immediately /// previous to this one. @@ -223,6 +226,11 @@ impl Response { charset_from_content_type(self.header("content-type")) } + /// The socket address of the server that sent the response. + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } + /// Turn this response into a `impl Read` of the body. /// /// 1. If `Transfer-Encoding: chunked`, the returned reader will unchunk it @@ -495,6 +503,7 @@ impl Response { /// /// assert_eq!(resp.status(), 401); pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result { + let remote_addr = stream.remote_addr; // // HTTP/1.1 200 OK\r\n let mut stream = stream::DeadlineStream::new(stream, unit.deadline); @@ -540,6 +549,7 @@ impl Response { headers, unit: Box::new(unit), reader: Box::new(Cursor::new(vec![])), + remote_addr, history: vec![], length, compression, @@ -668,7 +678,8 @@ impl FromStr for Response { /// # } /// ``` fn from_str(s: &str) -> Result { - let stream = Stream::new(ReadOnlyStream::new(s.into())); + let remote_addr = "0.0.0.0:0".parse().unwrap(); + let stream = Stream::new(ReadOnlyStream::new(s.into()), remote_addr); let request_url = "https://example.com".parse().unwrap(); let request_reader = SizedReader { size: crate::body::BodySize::Empty, @@ -1029,7 +1040,10 @@ mod tests { OK", ); let v = cow.to_vec(); - let s = Stream::new(ReadOnlyStream::new(v)); + let s = Stream::new( + ReadOnlyStream::new(v), + crate::stream::remote_addr_for_test(), + ); let request_url = "https://example.com".parse().unwrap(); let request_reader = SizedReader { size: crate::body::BodySize::Empty, diff --git a/src/stream.rs b/src/stream.rs index cefe48b..5d6d52f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -38,6 +38,8 @@ pub trait TlsConnector: Send + Sync { pub(crate) struct Stream { inner: BufReader>, + /// The remote address the stream is connected to. + pub(crate) remote_addr: SocketAddr, } impl ReadWrite for Box { @@ -177,9 +179,10 @@ impl fmt::Debug for Stream { } impl Stream { - pub(crate) fn new(t: impl ReadWrite) -> Stream { + pub(crate) fn new(t: impl ReadWrite, remote_addr: SocketAddr) -> Stream { Stream::logged_create(Stream { inner: BufReader::new(Box::new(t)), + remote_addr, }) } @@ -192,12 +195,6 @@ impl Stream { self.inner.buffer() } - fn from_tcp_stream(t: TcpStream) -> Stream { - Stream::logged_create(Stream { - inner: BufReader::new(Box::new(t)), - }) - } - // Check if the server has closed a stream by performing a one-byte // non-blocking read. If this returns EOF, the server has closed the // connection: return true. If this returns a successful read, there are @@ -307,20 +304,25 @@ pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result // let port = unit.url.port().unwrap_or(80); - connect_host(unit, hostname, port).map(Stream::from_tcp_stream) + connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r)) } pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result { let port = unit.url.port().unwrap_or(443); - let sock = connect_host(unit, hostname, port)?; + let (sock, remote_addr) = connect_host(unit, hostname, port)?; let tls_conf = &unit.agent.config.tls_config; let https_stream = tls_conf.connect(hostname, Box::new(sock))?; - Ok(Stream::new(https_stream)) + Ok(Stream::new(https_stream, remote_addr)) } -pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result { +/// If successful, returns a `TcpStream` and the remote address it is connected to. +pub(crate) fn connect_host( + unit: &Unit, + hostname: &str, + port: u16, +) -> Result<(TcpStream, SocketAddr), Error> { let connect_deadline: Option = if let Some(timeout_connect) = unit.agent.config.timeout_connect { Instant::now().checked_add(timeout_connect) @@ -347,7 +349,7 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result Result Result Result { Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme()))) } +#[cfg(test)] +pub(crate) fn remote_addr_for_test() -> SocketAddr { + use std::net::{Ipv4Addr, SocketAddrV4}; + SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into() +} + #[cfg(test)] mod tests { use super::*; @@ -662,7 +670,7 @@ mod tests { let recorder = ReadRecorder { reads: reads.clone(), }; - let stream = Stream::new(recorder); + let stream = Stream::new(recorder, remote_addr_for_test()); let mut deadline_stream = DeadlineStream::new(stream, None); let mut buf = [0u8; 1]; for _ in 0..8193 { diff --git a/src/test/mod.rs b/src/test/mod.rs index 349271a..eb0559a 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,5 +1,5 @@ use crate::error::Error; -use crate::stream::{ReadOnlyStream, Stream}; +use crate::stream::{remote_addr_for_test, ReadOnlyStream, Stream}; use crate::unit::Unit; use crate::ReadWrite; use once_cell::sync::Lazy; @@ -52,7 +52,10 @@ pub(crate) fn make_response( } write!(&mut buf, "\r\n").ok(); buf.append(&mut body); - Ok(Stream::new(ReadOnlyStream::new(buf))) + Ok(Stream::new( + ReadOnlyStream::new(buf), + remote_addr_for_test(), + )) } pub(crate) fn resolve_handler(unit: &Unit) -> Result { @@ -97,7 +100,10 @@ impl Recorder { fn stream(&self) -> Stream { let cursor = Cursor::new(b"HTTP/1.1 200 OK\r\n\r\n"); - Stream::new(TestStream::new(cursor, self.clone())) + Stream::new( + TestStream::new(cursor, self.clone()), + remote_addr_for_test(), + ) } }