diff --git a/Cargo.toml b/Cargo.toml index f3ac84b..a57f17b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,9 +43,6 @@ log = "0.4.11" [dev-dependencies] serde = { version = "1", features = ["derive"] } -rayon = "1.3.0" -rayon-core = "1.7.0" -chrono = "0.4.11" env_logger = "0.8.1" [[example]] diff --git a/examples/smoke-test/main.rs b/examples/smoke-test/main.rs index b2082f9..4c7ea28 100644 --- a/examples/smoke-test/main.rs +++ b/examples/smoke-test/main.rs @@ -1,11 +1,10 @@ -use chrono::Local; -use rayon::prelude::*; - use std::io::{self, BufRead, BufReader, Read}; -use std::iter::Iterator; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; use std::time::Duration; use std::{env, error, fmt, result}; +use log::{error, info}; use ureq; #[derive(Debug)] @@ -23,12 +22,6 @@ impl From for Oops { } } -impl From for Oops { - fn from(e: rayon_core::ThreadPoolBuildError) -> Oops { - Oops(e.to_string()) - } -} - impl error::Error for Oops {} impl fmt::Display for Oops { @@ -39,7 +32,7 @@ impl fmt::Display for Oops { type Result = result::Result; -fn get(agent: &ureq::Agent, url: &String) -> Result> { +fn get(agent: &ureq::Agent, url: &str) -> Result> { let response = agent.get(url).call()?; let mut reader = response.into_reader(); let mut bytes = vec![]; @@ -47,13 +40,12 @@ fn get(agent: &ureq::Agent, url: &String) -> Result> { Ok(bytes) } -fn get_and_write(agent: &ureq::Agent, url: &String) -> Result<()> { - println!("🕷️ {} {}", Local::now(), url); +fn get_and_write(agent: &ureq::Agent, url: &str) { + info!("🕷️ {}", url); match get(agent, url) { - Ok(_) => println!("✔️ {} {}", Local::now(), url), - Err(e) => println!("⚠️ {} {} {}", Local::now(), url, e), + Ok(_) => info!("✔️ {}", url), + Err(e) => error!("⚠️ {} {}", url, e), } - Ok(()) } fn get_many(urls: Vec, simultaneous_fetches: usize) -> Result<()> { @@ -61,12 +53,26 @@ fn get_many(urls: Vec, simultaneous_fetches: usize) -> Result<()> { .timeout_connect(Duration::from_secs(5)) .timeout(Duration::from_secs(20)) .build(); - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(simultaneous_fetches) - .build()?; - pool.scope(|_| { - urls.par_iter().map(|u| get_and_write(&agent, u)).count(); - }); + let mutex = Arc::new(Mutex::new(urls)); + let mut join_handles: Vec> = vec![]; + for _ in 0..simultaneous_fetches { + let mutex = mutex.clone(); + let agent = agent.clone(); + join_handles.push(thread::spawn(move || loop { + let mut guard = mutex.lock().unwrap(); + let u = match guard.pop() { + Some(u) => u, + None => return, + }; + + drop(guard); + + get_and_write(&agent, &u); + })); + } + for h in join_handles { + h.join().map_err(|e| Oops(format!("{:?}", e)))?; + } Ok(()) }