44
src/pool.rs
44
src/pool.rs
@@ -3,6 +3,7 @@ use std::io::{Read, Result as IoResult};
|
||||
|
||||
use crate::stream::Stream;
|
||||
use crate::unit::Unit;
|
||||
use crate::Proxy;
|
||||
|
||||
use url::Url;
|
||||
|
||||
@@ -35,8 +36,8 @@ impl ConnectionPool {
|
||||
}
|
||||
|
||||
/// How the unit::connect tries to get a pooled connection.
|
||||
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
|
||||
let key = PoolKey::new(url);
|
||||
pub fn try_get_connection(&mut self, url: &Url, proxy: &Option<Proxy>) -> Option<Stream> {
|
||||
let key = PoolKey::new(url, proxy);
|
||||
self.remove(&key)
|
||||
}
|
||||
|
||||
@@ -87,6 +88,7 @@ struct PoolKey {
|
||||
scheme: String,
|
||||
hostname: String,
|
||||
port: Option<u16>,
|
||||
proxy: Option<Proxy>,
|
||||
}
|
||||
|
||||
use std::fmt;
|
||||
@@ -103,12 +105,13 @@ impl fmt::Debug for PoolKey {
|
||||
}
|
||||
|
||||
impl PoolKey {
|
||||
fn new(url: &Url) -> Self {
|
||||
fn new(url: &Url, proxy: &Option<Proxy>) -> Self {
|
||||
let port = url.port_or_known_default();
|
||||
PoolKey {
|
||||
scheme: url.scheme().to_string(),
|
||||
hostname: url.host_str().unwrap_or("").to_string(),
|
||||
port,
|
||||
proxy: proxy.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,7 +119,7 @@ impl PoolKey {
|
||||
#[test]
|
||||
fn poolkey_new() {
|
||||
// Test that PoolKey::new() does not panic on unrecognized schemes.
|
||||
PoolKey::new(&Url::parse("zzz:///example.com").unwrap());
|
||||
PoolKey::new(&Url::parse("zzz:///example.com").unwrap(), &None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -128,6 +131,7 @@ fn pool_size_limit() {
|
||||
scheme: "https".to_string(),
|
||||
hostname,
|
||||
port: Some(999),
|
||||
proxy: None,
|
||||
});
|
||||
for key in poolkeys.clone() {
|
||||
pool.add(key, Stream::Cursor(std::io::Cursor::new(vec![])));
|
||||
@@ -152,6 +156,7 @@ fn pool_duplicates_limit() {
|
||||
scheme: "https".to_string(),
|
||||
hostname,
|
||||
port: Some(999),
|
||||
proxy: None,
|
||||
});
|
||||
for key in poolkeys.clone() {
|
||||
pool.add(key.clone(), Stream::Cursor(std::io::Cursor::new(vec![])));
|
||||
@@ -165,6 +170,35 @@ fn pool_duplicates_limit() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pool_checks_proxy() {
|
||||
// Test inserting different poolkeys with same address but different proxies.
|
||||
// Each insertion should result in an additional entry in the pool.
|
||||
let mut pool = ConnectionPool::new();
|
||||
let url = Url::parse("zzz:///example.com").unwrap();
|
||||
|
||||
pool.add(
|
||||
PoolKey::new(&url, &None),
|
||||
Stream::Cursor(std::io::Cursor::new(vec![])),
|
||||
);
|
||||
assert_eq!(pool.len(), 1);
|
||||
|
||||
pool.add(
|
||||
PoolKey::new(&url, &Some(Proxy::new("localhost:9999").unwrap())),
|
||||
Stream::Cursor(std::io::Cursor::new(vec![])),
|
||||
);
|
||||
assert_eq!(pool.len(), 2);
|
||||
|
||||
pool.add(
|
||||
PoolKey::new(
|
||||
&url,
|
||||
&Some(Proxy::new("user:password@localhost:9999").unwrap()),
|
||||
),
|
||||
Stream::Cursor(std::io::Cursor::new(vec![])),
|
||||
);
|
||||
assert_eq!(pool.len(), 3);
|
||||
}
|
||||
|
||||
/// Read wrapper that returns the stream to the pool once the
|
||||
/// read is exhausted (reached a 0).
|
||||
///
|
||||
@@ -196,7 +230,7 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
|
||||
return;
|
||||
}
|
||||
// insert back into pool
|
||||
let key = PoolKey::new(&unit.url);
|
||||
let key = PoolKey::new(&unit.url, &unit.proxy);
|
||||
agent.pool().add(key, stream);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use crate::error::Error;
|
||||
|
||||
/// Proxy protocol
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub enum Proto {
|
||||
HTTPConnect,
|
||||
SOCKS5,
|
||||
}
|
||||
|
||||
/// Proxy server definition
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct Proxy {
|
||||
pub(crate) server: String,
|
||||
pub(crate) port: u32,
|
||||
|
||||
@@ -301,7 +301,7 @@ fn connect_socket(unit: &Unit, use_pooled: bool) -> Result<(Stream, bool), Error
|
||||
// The connection may have been closed by the server
|
||||
// due to idle timeout while it was sitting in the pool.
|
||||
// Loop until we find one that is still good or run out of connections.
|
||||
while let Some(stream) = agent.pool.try_get_connection(&unit.url) {
|
||||
while let Some(stream) = agent.pool.try_get_connection(&unit.url, &unit.proxy) {
|
||||
let server_closed = stream.server_closed()?;
|
||||
if !server_closed {
|
||||
return Ok((stream, true));
|
||||
|
||||
Reference in New Issue
Block a user