test poolable against s3

This commit is contained in:
Martin Algesten
2018-06-30 23:05:40 +02:00
parent e2686e3bd5
commit 548b5d80c2
4 changed files with 67 additions and 1 deletions

View File

@@ -325,6 +325,11 @@ impl Agent {
{ {
self.request("PATCH", path) self.request("PATCH", path)
} }
#[cfg(test)]
pub fn state(&self) -> &Arc<Mutex<Option<AgentState>>> {
&self.state
}
} }
fn basic_auth(user: &str, pass: &str) -> String { fn basic_auth(user: &str, pass: &str) -> String {

View File

@@ -20,6 +20,20 @@ impl ConnectionPool {
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> { pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
self.recycle.remove(&PoolKey::new(url)) self.recycle.remove(&PoolKey::new(url))
} }
#[cfg(test)]
pub fn len(&self) -> usize {
self.recycle.len()
}
#[cfg(test)]
pub fn get(&self, hostname: &str, port: u16) -> Option<&Stream> {
let key = PoolKey {
hostname: hostname.into(),
port,
};
self.recycle.get(&key)
}
} }
#[derive(Debug, PartialEq, Clone, Eq, Hash)] #[derive(Debug, PartialEq, Clone, Eq, Hash)]
@@ -66,12 +80,16 @@ impl<R: Read + Sized> PoolReturnRead<R> {
if let Some(agent) = state.as_mut() { if let Some(agent) = state.as_mut() {
unsafe { unsafe {
let stream = *Box::from_raw(self.stream); let stream = *Box::from_raw(self.stream);
self.stream = ::std::ptr::null_mut();
if !stream.is_poolable() {
// just let it deallocate
return;
}
// insert back into pool // insert back into pool
let key = PoolKey::new(&unit.url); let key = PoolKey::new(&unit.url);
agent.pool().recycle.insert(key, stream); agent.pool().recycle.insert(key, stream);
} }
}; };
self.stream = ::std::ptr::null_mut();
} }
} }

View File

@@ -35,6 +35,14 @@ impl ::std::fmt::Debug for Stream {
} }
impl Stream { impl Stream {
pub fn is_poolable(&self) -> bool {
match self {
Stream::Http(_) => true,
Stream::Https(_) => true,
_ => false,
}
}
#[cfg(test)] #[cfg(test)]
pub fn to_write_vec(&self) -> Vec<u8> { pub fn to_write_vec(&self) -> Vec<u8> {
match self { match self {

View File

@@ -17,3 +17,38 @@ fn read_range() {
[0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 3, 232, 0, 0, 0, 1] [0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 3, 232, 0, 0, 0, 1]
) )
} }
#[test]
fn agent_pool() {
let agent = agent().build();
// req 1
let resp = agent.get("https://s3.amazonaws.com/foosrvr/bbb.mp4")
.set("Range", "bytes=1000-1999")
.call();
assert_eq!(*resp.status(), 206);
let mut reader = resp.into_reader();
let mut buf = vec![];
// reading the entire content will return the connection to the pool
let len = reader.read_to_end(&mut buf).unwrap();
assert_eq!(len, 1000);
{
let mut lock = agent.state().lock().unwrap();
let state = lock.as_mut().unwrap();
let pool = state.pool();
assert_eq!(pool.len(), 1);
let foo = format!("{:?}", pool.get("s3.amazonaws.com", 443));
assert_eq!(foo, "Some(Stream[https])"); // not a great way of testing.
}
// req 2 should be done with a reused connection
let resp = agent.get("https://s3.amazonaws.com/foosrvr/bbb.mp4")
.set("Range", "bytes=5000-6999")
.call();
assert_eq!(*resp.status(), 206);
let mut reader = resp.into_reader();
let mut buf = vec![];
let len = reader.read_to_end(&mut buf).unwrap();
assert_eq!(len, 2000);
}