|
|
|
|
|
|
|
|
|
|
|
use std::{ |
|
|
|
|
|
sync::{mpsc, Arc, Mutex}, |
|
|
|
|
|
thread |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub struct ThreadPool { |
|
|
|
|
|
workers: Vec<Worker>, |
|
|
|
|
|
sender: Option<mpsc::Sender<Job>>, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Box: pointer for heap allocation (smart pointer); dyn: unknown type to compile time |
|
|
|
|
|
type Job = Box<dyn FnOnce() + Send + 'static>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl ThreadPool { |
|
|
|
|
|
/// Create a new ThreadPool. |
|
|
|
|
|
/// |
|
|
|
|
|
/// The size is the number of threads in the pool. |
|
|
|
|
|
/// |
|
|
|
|
|
/// # Panics |
|
|
|
|
|
/// |
|
|
|
|
|
/// The `new` function will panic if the size is zero. |
|
|
|
|
|
pub fn new(size: usize) -> ThreadPool { |
|
|
|
|
|
assert!(size > 0); |
|
|
|
|
|
|
|
|
|
|
|
// create asynchronous channel with sender/receiver pair (for jobs!) |
|
|
|
|
|
let (sender, receiver) = mpsc::channel(); |
|
|
|
|
|
|
|
|
|
|
|
// protect receiver with thread-safe shared (Arc) mutex to avoid multiple access |
|
|
|
|
|
let receiver = Arc::new(Mutex::new(receiver)); |
|
|
|
|
|
|
|
|
|
|
|
// pre(!)allocate space for workers |
|
|
|
|
|
let mut workers = Vec::with_capacity(size); |
|
|
|
|
|
|
|
|
|
|
|
// initialize all workers... |
|
|
|
|
|
for id in 0..size { |
|
|
|
|
|
// create worker with unique id and pointer to the shared mutex |
|
|
|
|
|
workers.push(Worker::new(id, Arc::clone(&receiver))); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// set thread pool variables |
|
|
|
|
|
ThreadPool { |
|
|
|
|
|
workers, |
|
|
|
|
|
// Some --> enum variant (Option): indicates that sender may not exist (=be null) |
|
|
|
|
|
sender: Some(sender), |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn execute<F>(&self, f: F) |
|
|
|
|
|
where |
|
|
|
|
|
// FnOnce() --> only callable once; Send --> transfer to another thread; 'static --> lifetime |
|
|
|
|
|
F: FnOnce() + Send + 'static, |
|
|
|
|
|
{ |
|
|
|
|
|
let job = Box::new(f); |
|
|
|
|
|
|
|
|
|
|
|
// commit job to sender object --> pass to threads for execution |
|
|
|
|
|
self.sender.as_ref().unwrap().send(job).unwrap(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Drop for ThreadPool { |
|
|
|
|
|
fn drop(&mut self) { |
|
|
|
|
|
// drop sender first |
|
|
|
|
|
drop(self.sender.take()); |
|
|
|
|
|
|
|
|
|
|
|
// then drop workers |
|
|
|
|
|
for worker in &mut self.workers { |
|
|
|
|
|
println!("Shutting down worker {}", worker.id); |
|
|
|
|
|
|
|
|
|
|
|
// join worker thread |
|
|
|
|
|
if let Some(thread) = worker.thread.take() { |
|
|
|
|
|
thread.join().unwrap(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct Worker { |
|
|
|
|
|
id: usize, |
|
|
|
|
|
thread: Option<thread::JoinHandle<()>>, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Worker { |
|
|
|
|
|
/// Create a new Worker.. |
|
|
|
|
|
/// |
|
|
|
|
|
/// The id is a unique identifier. |
|
|
|
|
|
/// The receiver is a shared pointer to a receiver protected by Mutex. |
|
|
|
|
|
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { |
|
|
|
|
|
// spawn thread and enter infinite loop |
|
|
|
|
|
let thread = thread::spawn(move || loop { |
|
|
|
|
|
// lock mutex and wait for job |
|
|
|
|
|
let message = receiver.lock().unwrap().recv(); |
|
|
|
|
|
|
|
|
|
|
|
match message { |
|
|
|
|
|
// normal operation --> execute job |
|
|
|
|
|
Ok(job) => { |
|
|
|
|
|
println!("Worker {id} got a job; executing."); |
|
|
|
|
|
|
|
|
|
|
|
job(); |
|
|
|
|
|
} |
|
|
|
|
|
// exit gracefully when pool is closed --> recv will return error then |
|
|
|
|
|
Err(_) => { |
|
|
|
|
|
println!("Worker {id} disconnected; shutting down."); |
|
|
|
|
|
break; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
Worker { |
|
|
|
|
|
id, |
|
|
|
|
|
thread: Some(thread), |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |