示例 20-20 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 workers、id 和 thread
字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 ctrl-c 终止主线程时,
所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。
现在我们要为 ThreadPool 实现 Drop trait 对线程池中的每一个线程调用 join,这样这些线程将会执行
完他们的请求。接着会为 ThreadPool 实现一个告诉线程他们应该停止接收新请求并结束的方式。为了
实践这些代码,修改 server 在优雅停机(graceful shutdown)之前只接受两个请求。
为 ThreadPool 实现 Drop Trait
现在开始为线程池实现 Drop。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。示例
20-22 展示了 Drop 实现的第一次尝试;这些代码还不能够编译:
文件名: src∕lib.rs
# use std::sync::mpsc;
# use std::sync::Arc;
# use std::sync::Mutex;
# use std::thread;
#
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
#
# type Job = Box<dyn FnOnce() + Send + 'static>;
#
# impl ThreadPool {
# /// Create a new ThreadPool.
# ///
# /// The size is the number of threads in the pool.
# ///
# /// # Panics
# ///
# /// The `new` function will panic if the size is zero.
# pub fn new(size: usize) -> ThreadPool {
# assert!(size > 0);
#
# let (sender, receiver) = mpsc::channel();
#
# let receiver = Arc::new(Mutex::new(receiver));
#
# let mut workers = Vec::with_capacity(size);
#
# for id in 0..size {
# workers.push(Worker::new(id, Arc::clone(&receiver)));
# }
#
# ThreadPool { workers, sender }
# }
#
20.3. 优雅停机与清理 557
# pub fn execute<F>(&self, f: F)
# where
# F: FnOnce() + Send + 'static,
# {
# let job = Box::new(f);
#
# self.sender.send(job).unwrap();
# }
# }
#
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
#
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
# impl Worker {
# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
# let thread = thread::spawn(move || loop {
# let job = receiver.lock().unwrap().recv().unwrap();
#
# println!("Worker {} got a job; executing.", id);
#
# job();
# });
#
# Worker { id, thread }
# }
# }
示例 20-22: 当线程池离开作用域时 join 每个线程
这里首先遍历线程池中的每个 workers。这里使用了 &mut 因为 self 本身是一个可变引用而且也需要能
够修改 worker。对于每一个线程,会打印出说明信息表明此特定 worker 正在关闭,接着在 worker 线
程上调用 join。如果 join 调用失败,通过 unwrap 使得 panic 并进行不优雅的关闭。
如下是尝试编译代码时得到的错误:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error
558 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER
这告诉我们并不能调用 join,因为只有每一个 worker 的可变借用,而 join 获取其参数的所有权。为了
解决这个问题,需要一个方法将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消费这个线
程。示例 17-15 中我们曾见过这么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,
就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。换句话说,
正在运行的 Worker 的 thread 将是 Some 成员值,而当需要清理 worker 时,将 Some 替换为 None,
这样 worker 就没有可以运行的线程了。
为此需要更新 Worker 的定义为如下:
文件名: src∕lib.rs
# use std::sync::mpsc;
# use std::sync::Arc;
# use std::sync::Mutex;
# use std::thread;
#
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
#
# type Job = Box<dyn FnOnce() + Send + 'static>;
#
# impl ThreadPool {
# /// Create a new ThreadPool.
# ///
# /// The size is the number of threads in the pool.
# ///
# /// # Panics
# ///
# /// The `new` function will panic if the size is zero.
# pub fn new(size: usize) -> ThreadPool {
# assert!(size > 0);
#
# let (sender, receiver) = mpsc::channel();
#
# let receiver = Arc::new(Mutex::new(receiver));
#
# let mut workers = Vec::with_capacity(size);
#
# for id in 0..size {
# workers.push(Worker::new(id, Arc::clone(&receiver)));
# }
#
# ThreadPool { workers, sender }
# }
#
# pub fn execute<F>(&self, f: F)
# where
# F: FnOnce() + Send + 'static,
# {
# let job = Box::new(f);
#
# self.sender.send(job).unwrap();
# }
# }
#
# impl Drop for ThreadPool {
20.3. 优雅停机与清理 559
# fn drop(&mut self) {
# for worker in &mut self.workers {
# println!("Shutting down worker {}", worker.id);
#
# worker.thread.join().unwrap();
# }
# }
# }
#
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
#
# impl Worker {
# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
# let thread = thread::spawn(move || loop {
# let job = receiver.lock().unwrap().recv().unwrap();
#
# println!("Worker {} got a job; executing.", id);
#
# job();
# });
#
# Worker { id, thread }
# }
# }
现在依靠编译器来找出其他需要修改的地方。check 代码会得到两个错误:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in `Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct `JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, Some(thread) }
| +++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors
让我们修复第二个错误,它指向 Worker::new 结尾的代码;当新建 Worker 时需要将 thread 值封装进
Some。做出如下改变以修复问题:
560 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER
文件名: src∕lib.rs
# use std::sync::mpsc;
# use std::sync::Arc;
# use std::sync::Mutex;
# use std::thread;
#
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
#
# type Job = Box<dyn FnOnce() + Send + 'static>;
#
# impl ThreadPool {
# /// Create a new ThreadPool.
# ///
# /// The size is the number of threads in the pool.
# ///
# /// # Panics
# ///
# /// The `new` function will panic if the size is zero.
# pub fn new(size: usize) -> ThreadPool {
# assert!(size > 0);
#
# let (sender, receiver) = mpsc::channel();
#
# let receiver = Arc::new(Mutex::new(receiver));
#
# let mut workers = Vec::with_capacity(size);
#
# for id in 0..size {
# workers.push(Worker::new(id, Arc::clone(&receiver)));
# }
#
# ThreadPool { workers, sender }
# }
#
# pub fn execute<F>(&self, f: F)
# where
# F: FnOnce() + Send + 'static,
# {
# let job = Box::new(f);
#
# self.sender.send(job).unwrap();
# }
# }
#
# impl Drop for ThreadPool {
# fn drop(&mut self) {
# for worker in &mut self.workers {
# println!("Shutting down worker {}", worker.id);
#
# worker.thread.join().unwrap();
# }
# }
# }
20.3. 优雅停机与清理 561
#
# struct Worker {
# id: usize,
# thread: Option<thread::JoinHandle<()>>,
# }
#
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
# let thread = thread::spawn(move || loop {
# let job = receiver.lock().unwrap().recv().unwrap();
#
# println!("Worker {} got a job; executing.", id);
#
# job();
# });
#
Worker {
id,
thread: Some(thread),
}
}
}
第一个错误位于 Drop 实现中。之前提到过要调用 Option 上的 take 将 thread 移动出 worker。如下改
变会修复问题:
文件名: src∕lib.rs
# use std::sync::mpsc;
# use std::sync::Arc;
# use std::sync::Mutex;
# use std::thread;
#
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
#
# type Job = Box<dyn FnOnce() + Send + 'static>;
#
# impl ThreadPool {
# /// Create a new ThreadPool.
# ///
# /// The size is the number of threads in the pool.
# ///
# /// # Panics
# ///
# /// The `new` function will panic if the size is zero.
# pub fn new(size: usize) -> ThreadPool {
# assert!(size > 0);
#
# let (sender, receiver) = mpsc::channel();
#
# let receiver = Arc::new(Mutex::new(receiver));
#
# let mut workers = Vec::with_capacity(size);
562 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER
#
# for id in 0..size {
# workers.push(Worker::new(id, Arc::clone(&receiver)));
# }
#
# ThreadPool { workers, sender }
# }
#
# pub fn execute<F>(&self, f: F)
# where
# F: FnOnce() + Send + 'static,
# {
# let job = Box::new(f);
#
# self.sender.send(job).unwrap();
# }
# }
#
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
#
# struct Worker {
# id: usize,
# thread: Option<thread::JoinHandle<()>>,
# }
#
# impl Worker {
# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
# let thread = thread::spawn(move || loop {
# let job = receiver.lock().unwrap().recv().unwrap();
#
# println!("Worker {} got a job; executing.", id);
#
# job();
# });
#
# Worker {
# id,
# thread: Some(thread),
# }
# }
# }
如第十七章我们见过的,Option 上的 take 方法会取出 Some 而留下 None。使用 if let 解构 Some 并
得到线程,接着在线程上调用 join。如果 worker 的线程已然是 None,就知道此时这个 worker 已经清
理了其线程所以无需做任何操作。