diff --git a/src/agent.rs b/src/agent.rs index 48493a8..e6f0ab2 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -57,6 +57,9 @@ impl AgentState { jar: CookieJar::new(), } } + pub fn pool(&mut self) -> &mut ConnectionPool { + &mut self.pool + } } impl Agent { diff --git a/src/error.rs b/src/error.rs index 38394b5..35cb6d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,9 +48,7 @@ impl Error { /// For synthetic responses, this is the status text. pub fn status_text(&self) -> &str { match self { - Error::BadUrl(_) => { - "Bad URL" - } + Error::BadUrl(_) => "Bad URL", Error::UnknownScheme(_) => "Unknown Scheme", Error::DnsFailed(_) => "Dns Failed", Error::ConnectionFailed(_) => "Connection Failed", diff --git a/src/pool.rs b/src/pool.rs index c6e8984..fcf9104 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -6,7 +6,8 @@ use url::Url; #[derive(Default, Debug)] pub struct ConnectionPool { - recycle: HashMap, + // the actual pooled connection. however only one per hostname:port. + recycle: HashMap, } impl ConnectionPool { @@ -17,25 +18,61 @@ impl ConnectionPool { } pub fn try_get_connection(&mut self, url: &Url) -> Option { - self.recycle.remove(url) + self.recycle.remove(&PoolKey::new(url)) + } +} + +#[derive(Debug, PartialEq, Clone, Eq, Hash)] +struct PoolKey { + hostname: String, + port: u16, +} + +impl PoolKey { + fn new(url: &Url) -> Self { + PoolKey { + hostname: url.host_str().unwrap_or("localhost").into(), + port: url.port_or_known_default().unwrap_or(0), + } } } pub struct PoolReturnRead { unit: Option, + // pointer to underlying stream + stream: *mut Stream, + // wrapped reader around the same stream reader: Option, } impl PoolReturnRead { - pub fn new(unit: Option, reader: R) -> Self { + pub fn new(unit: Option, stream: *mut Stream, reader: R) -> Self { PoolReturnRead { unit, + stream, reader: Some(reader), } } fn return_connection(&mut self) { - if let Some(_unit) = self.unit.take() {} + if let Some(unit) = self.unit.take() { + // this frees up the wrapper type around the Stream so + // we can safely bring the stream pointer back. + self.reader.take(); + if self.stream.is_null() { + return; + } + let state = &mut unit.agent.lock().unwrap(); + if let Some(agent) = state.as_mut() { + unsafe { + let stream = *Box::from_raw(self.stream); + // insert back into pool + let key = PoolKey::new(&unit.url); + agent.pool().recycle.insert(key, stream); + } + }; + self.stream = ::std::ptr::null_mut(); + } } fn do_read(&mut self, buf: &mut [u8]) -> IoResult { diff --git a/src/response.rs b/src/response.rs index 4009e19..024a5c5 100644 --- a/src/response.rs +++ b/src/response.rs @@ -264,18 +264,23 @@ impl Response { .and_then(|l| l.parse::().ok()) }; - let reader = self.stream.expect("No reader in response?!"); + let stream = Box::new(self.stream.expect("No reader in response?!")); + let stream_ptr = Box::into_raw(stream); + let yolo = YoloRead { stream: stream_ptr }; let unit = self.unit; - // figure out how to make a reader match (is_chunked && !is_head, len) { - (true, _) => { - Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(reader))) as Box - } - (false, Some(len)) => { - Box::new(PoolReturnRead::new(unit, LimitedRead::new(reader, len))) - } - (false, None) => Box::new(PoolReturnRead::new(unit, reader)) as Box, + (true, _) => Box::new(PoolReturnRead::new( + unit, + stream_ptr, + ChunkDecoder::new(yolo), + )) as Box, + (false, Some(len)) => Box::new(PoolReturnRead::new( + unit, + stream_ptr, + LimitedRead::new(yolo, len), + )), + (false, None) => Box::new(yolo), } } @@ -484,14 +489,33 @@ fn read_next_line(reader: &mut R) -> IoResult { } } +struct YoloRead { + stream: *mut Stream, +} + +impl Read for YoloRead { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + unsafe { + if self.stream.is_null() { + return Ok(0); + } + let amount = (*self.stream).read(buf)?; + if amount == 0 { + self.stream = ::std::ptr::null_mut(); + } + Ok(amount) + } + } +} + struct LimitedRead { - reader: Stream, + reader: YoloRead, limit: usize, position: usize, } impl LimitedRead { - fn new(reader: Stream, limit: usize) -> Self { + fn new(reader: YoloRead, limit: usize) -> Self { LimitedRead { reader, limit,