diff --git a/src/stream.rs b/src/stream.rs index aa04a40..cce442d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -2,6 +2,7 @@ use std::io::{Cursor, ErrorKind, Read, Result as IoResult, Write}; use std::net::SocketAddr; use std::net::TcpStream; use std::net::ToSocketAddrs; +use std::thread; use std::time::Duration; #[cfg(feature = "tls")] @@ -9,10 +10,15 @@ use rustls::ClientSession; #[cfg(feature = "tls")] use rustls::StreamOwned; -use crate::proxy::Proto; -use crate::proxy::Proxy; #[cfg(feature = "socks-proxy")] use socks::{Socks5Stream, ToTargetAddr}; +#[cfg(feature = "socks-proxy")] +use std::sync::mpsc::channel; +#[cfg(feature = "socks-proxy")] +use std::sync::{Arc, Condvar, Mutex}; + +use crate::proxy::Proto; +use crate::proxy::Proxy; use crate::error::Error; use crate::unit::Unit; @@ -197,7 +203,13 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result TcpStream::connect(&sock_addr), @@ -252,9 +264,10 @@ pub(crate) fn connect_host(unit: &Unit, hostname: &str, port: u16) -> Result Result { let host_addrs: Vec = format!("{}:{}", hostname, port) @@ -265,32 +278,81 @@ fn connect_socks5( if host_addrs.is_empty() { return Err(std::io::Error::new( ErrorKind::NotFound, - format!("No ip address for {}.", proxy.server), + format!("DNS lookup failed for {}:{}.", hostname, port), )); } - let host_addr = host_addrs[0].to_target_addr()?; + let stream = if timeout_connect > 0 { + let master_signal = Arc::new((Mutex::new(false), Condvar::new())); + let slave_signal = master_signal.clone(); + let (tx, rx) = channel(); + let host_addr = host_addrs[0]; + thread::spawn(move || { + let (lock, cvar) = &*slave_signal; + if tx + .send(get_socks5_stream(&proxy, &proxy_addr, &host_addr)) + .is_ok() + { + let mut done = lock.lock().unwrap(); + *done = true; + cvar.notify_one(); + } + }); - let stream = if proxy.use_authorization() { - Socks5Stream::connect_with_password( - proxy_addr, - host_addr, - &proxy.user.as_ref().unwrap(), - &proxy.password.as_ref().unwrap(), - )? - .into_inner() + let (lock, cvar) = &*master_signal; + let done = lock.lock().unwrap(); + + let done_result = cvar + .wait_timeout(done, Duration::from_millis(timeout_connect)) + .unwrap(); + let done = done_result.0; + if *done { + rx.recv().unwrap()? + } else { + return Err(std::io::Error::new( + ErrorKind::TimedOut, + format!( + "SOCKS5 proxy: {}:{} timed out connecting after {}ms.", + hostname, port, timeout_connect + ), + )); + } } else { - Socks5Stream::connect(proxy_addr, host_addr)?.into_inner() + get_socks5_stream(&proxy, &proxy_addr, &host_addrs[0])? }; Ok(stream) } +#[cfg(feature = "socks-proxy")] +fn get_socks5_stream( + proxy: &Proxy, + proxy_addr: &SocketAddr, + host_addr: &SocketAddr, +) -> Result { + if proxy.use_authorization() { + let stream = Socks5Stream::connect_with_password( + proxy_addr, + host_addr.to_target_addr()?, + &proxy.user.as_ref().unwrap(), + &proxy.password.as_ref().unwrap(), + )? + .into_inner(); + Ok(stream) + } else { + match Socks5Stream::connect(proxy_addr, host_addr.to_target_addr()?) { + Ok(socks_stream) => Ok(socks_stream.into_inner()), + Err(err) => Err(err), + } + } +} + #[cfg(not(feature = "socks-proxy"))] fn connect_socks5( - _proxy: &Proxy, - _proxy_addr: &SocketAddr, - _hostname: &str, + _proxy: Proxy, + _timeout_connect: u64, + _proxy_addr: SocketAddr, + _hostname: String, _port: u16, ) -> Result { Err(std::io::Error::new(