connection pool done

This commit is contained in:
Martin Algesten
2018-06-30 17:55:11 +02:00
parent 4a5944443f
commit 552728d1d1
4 changed files with 80 additions and 18 deletions

View File

@@ -57,6 +57,9 @@ impl AgentState {
jar: CookieJar::new(), jar: CookieJar::new(),
} }
} }
pub fn pool(&mut self) -> &mut ConnectionPool {
&mut self.pool
}
} }
impl Agent { impl Agent {

View File

@@ -48,9 +48,7 @@ impl Error {
/// For synthetic responses, this is the status text. /// For synthetic responses, this is the status text.
pub fn status_text(&self) -> &str { pub fn status_text(&self) -> &str {
match self { match self {
Error::BadUrl(_) => { Error::BadUrl(_) => "Bad URL",
"Bad URL"
}
Error::UnknownScheme(_) => "Unknown Scheme", Error::UnknownScheme(_) => "Unknown Scheme",
Error::DnsFailed(_) => "Dns Failed", Error::DnsFailed(_) => "Dns Failed",
Error::ConnectionFailed(_) => "Connection Failed", Error::ConnectionFailed(_) => "Connection Failed",

View File

@@ -6,7 +6,8 @@ use url::Url;
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct ConnectionPool { pub struct ConnectionPool {
recycle: HashMap<Url, Stream>, // the actual pooled connection. however only one per hostname:port.
recycle: HashMap<PoolKey, Stream>,
} }
impl ConnectionPool { impl ConnectionPool {
@@ -17,25 +18,61 @@ impl ConnectionPool {
} }
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> { pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
self.recycle.remove(url) self.recycle.remove(&PoolKey::new(url))
}
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
struct PoolKey {
hostname: String,
port: u16,
}
impl PoolKey {
fn new(url: &Url) -> Self {
PoolKey {
hostname: url.host_str().unwrap_or("localhost").into(),
port: url.port_or_known_default().unwrap_or(0),
}
} }
} }
pub struct PoolReturnRead<R: Read + Sized> { pub struct PoolReturnRead<R: Read + Sized> {
unit: Option<Unit>, unit: Option<Unit>,
// pointer to underlying stream
stream: *mut Stream,
// wrapped reader around the same stream
reader: Option<R>, reader: Option<R>,
} }
impl<R: Read + Sized> PoolReturnRead<R> { impl<R: Read + Sized> PoolReturnRead<R> {
pub fn new(unit: Option<Unit>, reader: R) -> Self { pub fn new(unit: Option<Unit>, stream: *mut Stream, reader: R) -> Self {
PoolReturnRead { PoolReturnRead {
unit, unit,
stream,
reader: Some(reader), reader: Some(reader),
} }
} }
fn return_connection(&mut self) { fn return_connection(&mut self) {
if let Some(_unit) = self.unit.take() {} if let Some(unit) = self.unit.take() {
// this frees up the wrapper type around the Stream so
// we can safely bring the stream pointer back.
self.reader.take();
if self.stream.is_null() {
return;
}
let state = &mut unit.agent.lock().unwrap();
if let Some(agent) = state.as_mut() {
unsafe {
let stream = *Box::from_raw(self.stream);
// insert back into pool
let key = PoolKey::new(&unit.url);
agent.pool().recycle.insert(key, stream);
}
};
self.stream = ::std::ptr::null_mut();
}
} }
fn do_read(&mut self, buf: &mut [u8]) -> IoResult<usize> { fn do_read(&mut self, buf: &mut [u8]) -> IoResult<usize> {

View File

@@ -264,18 +264,23 @@ impl Response {
.and_then(|l| l.parse::<usize>().ok()) .and_then(|l| l.parse::<usize>().ok())
}; };
let reader = self.stream.expect("No reader in response?!"); let stream = Box::new(self.stream.expect("No reader in response?!"));
let stream_ptr = Box::into_raw(stream);
let yolo = YoloRead { stream: stream_ptr };
let unit = self.unit; let unit = self.unit;
// figure out how to make a reader
match (is_chunked && !is_head, len) { match (is_chunked && !is_head, len) {
(true, _) => { (true, _) => Box::new(PoolReturnRead::new(
Box::new(PoolReturnRead::new(unit, ChunkDecoder::new(reader))) as Box<Read> unit,
} stream_ptr,
(false, Some(len)) => { ChunkDecoder::new(yolo),
Box::new(PoolReturnRead::new(unit, LimitedRead::new(reader, len))) )) as Box<Read>,
} (false, Some(len)) => Box::new(PoolReturnRead::new(
(false, None) => Box::new(PoolReturnRead::new(unit, reader)) as Box<Read>, unit,
stream_ptr,
LimitedRead::new(yolo, len),
)),
(false, None) => Box::new(yolo),
} }
} }
@@ -484,14 +489,33 @@ fn read_next_line<R: Read>(reader: &mut R) -> IoResult<AsciiString> {
} }
} }
struct YoloRead {
stream: *mut Stream,
}
impl Read for YoloRead {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
unsafe {
if self.stream.is_null() {
return Ok(0);
}
let amount = (*self.stream).read(buf)?;
if amount == 0 {
self.stream = ::std::ptr::null_mut();
}
Ok(amount)
}
}
}
struct LimitedRead { struct LimitedRead {
reader: Stream, reader: YoloRead,
limit: usize, limit: usize,
position: usize, position: usize,
} }
impl LimitedRead { impl LimitedRead {
fn new(reader: Stream, limit: usize) -> Self { fn new(reader: YoloRead, limit: usize) -> Self {
LimitedRead { LimitedRead {
reader, reader,
limit, limit,