0
点赞
收藏
分享

微信扫一扫

rust 优雅停机与清理


示例 20-20 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 workers、id 和 thread

字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 ctrl-c 终止主线程时,

所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。

现在我们要为 ThreadPool 实现 Drop trait 对线程池中的每一个线程调用 join,这样这些线程将会执行

完他们的请求。接着会为 ThreadPool 实现一个告诉线程他们应该停止接收新请求并结束的方式。为了

实践这些代码,修改 server 在优雅停机(graceful shutdown)之前只接受两个请求。

为 ThreadPool 实现 Drop Trait

现在开始为线程池实现 Drop。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。示例

20-22 展示了 Drop 实现的第一次尝试;这些代码还不能够编译:

文件名: src∕lib.rs

# use std::sync::mpsc;

# use std::sync::Arc;

# use std::sync::Mutex;

# use std::thread;

#

# pub struct ThreadPool {

# workers: Vec<Worker>,

# sender: mpsc::Sender<Job>,

# }

#

# type Job = Box<dyn FnOnce() + Send + 'static>;

#

# impl ThreadPool {

# /// Create a new ThreadPool.

# ///

# /// The size is the number of threads in the pool.

# ///

# /// # Panics

# ///

# /// The `new` function will panic if the size is zero.

# pub fn new(size: usize) -> ThreadPool {

# assert!(size > 0);

#

# let (sender, receiver) = mpsc::channel();

#

# let receiver = Arc::new(Mutex::new(receiver));

#

# let mut workers = Vec::with_capacity(size);

#

# for id in 0..size {

# workers.push(Worker::new(id, Arc::clone(&receiver)));

# }

#

# ThreadPool { workers, sender }

# }

#

20.3. 优雅停机与清理 557

# pub fn execute<F>(&self, f: F)

# where

# F: FnOnce() + Send + 'static,

# {

# let job = Box::new(f);

#

# self.sender.send(job).unwrap();

# }

# }

#

impl Drop for ThreadPool {

fn drop(&mut self) {

for worker in &mut self.workers {

println!("Shutting down worker {}", worker.id);

worker.thread.join().unwrap();

}

}

}

#

# struct Worker {

# id: usize,

# thread: thread::JoinHandle<()>,

# }

#

# impl Worker {

# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {

# let thread = thread::spawn(move || loop {

# let job = receiver.lock().unwrap().recv().unwrap();

#

# println!("Worker {} got a job; executing.", id);

#

# job();

# });

#

# Worker { id, thread }

# }

# }

示例 20-22: 当线程池离开作用域时 join 每个线程

这里首先遍历线程池中的每个 workers。这里使用了 &mut 因为 self 本身是一个可变引用而且也需要能

够修改 worker。对于每一个线程,会打印出说明信息表明此特定 worker 正在关闭,接着在 worker 线

程上调用 join。如果 join 调用失败,通过 unwrap 使得 panic 并进行不优雅的关闭。

如下是尝试编译代码时得到的错误:

$ cargo check

Checking hello v0.1.0 (file:///projects/hello)

error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference

--> src/lib.rs:52:13

|

52 | worker.thread.join().unwrap();

| ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.

error: could not compile `hello` due to previous error

558 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER

这告诉我们并不能调用 join,因为只有每一个 worker 的可变借用,而 join 获取其参数的所有权。为了

解决这个问题,需要一个方法将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消费这个线

程。示例 17-15 中我们曾见过这么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,

就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。换句话说,

正在运行的 Worker 的 thread 将是 Some 成员值,而当需要清理 worker 时,将 Some 替换为 None,

这样 worker 就没有可以运行的线程了。

为此需要更新 Worker 的定义为如下:

文件名: src∕lib.rs

# use std::sync::mpsc;

# use std::sync::Arc;

# use std::sync::Mutex;

# use std::thread;

#

# pub struct ThreadPool {

# workers: Vec<Worker>,

# sender: mpsc::Sender<Job>,

# }

#

# type Job = Box<dyn FnOnce() + Send + 'static>;

#

# impl ThreadPool {

# /// Create a new ThreadPool.

# ///

# /// The size is the number of threads in the pool.

# ///

# /// # Panics

# ///

# /// The `new` function will panic if the size is zero.

# pub fn new(size: usize) -> ThreadPool {

# assert!(size > 0);

#

# let (sender, receiver) = mpsc::channel();

#

# let receiver = Arc::new(Mutex::new(receiver));

#

# let mut workers = Vec::with_capacity(size);

#

# for id in 0..size {

# workers.push(Worker::new(id, Arc::clone(&receiver)));

# }

#

# ThreadPool { workers, sender }

# }

#

# pub fn execute<F>(&self, f: F)

# where

# F: FnOnce() + Send + 'static,

# {

# let job = Box::new(f);

#

# self.sender.send(job).unwrap();

# }

# }

#

# impl Drop for ThreadPool {

20.3. 优雅停机与清理 559

# fn drop(&mut self) {

# for worker in &mut self.workers {

# println!("Shutting down worker {}", worker.id);

#

# worker.thread.join().unwrap();

# }

# }

# }

#

struct Worker {

id: usize,

thread: Option<thread::JoinHandle<()>>,

}

#

# impl Worker {

# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {

# let thread = thread::spawn(move || loop {

# let job = receiver.lock().unwrap().recv().unwrap();

#

# println!("Worker {} got a job; executing.", id);

#

# job();

# });

#

# Worker { id, thread }

# }

# }

现在依靠编译器来找出其他需要修改的地方。check 代码会得到两个错误:

$ cargo check

Checking hello v0.1.0 (file:///projects/hello)

error[E0599]: no method named `join` found for enum `Option` in the current scope

--> src/lib.rs:52:27

|

52 | worker.thread.join().unwrap();

| ^^^^ method not found in `Option<JoinHandle<()>>`

error[E0308]: mismatched types

--> src/lib.rs:72:22

|

72 | Worker { id, thread }

| ^^^^^^ expected enum `Option`, found struct `JoinHandle`

|

= note: expected enum `Option<JoinHandle<()>>`

found struct `JoinHandle<_>`

help: try wrapping the expression in `Some`

|

72 | Worker { id, Some(thread) }

| +++++ +

Some errors have detailed explanations: E0308, E0599.

For more information about an error, try `rustc --explain E0308`.

error: could not compile `hello` due to 2 previous errors

让我们修复第二个错误,它指向 Worker::new 结尾的代码;当新建 Worker 时需要将 thread 值封装进

Some。做出如下改变以修复问题:

560 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER

文件名: src∕lib.rs

# use std::sync::mpsc;

# use std::sync::Arc;

# use std::sync::Mutex;

# use std::thread;

#

# pub struct ThreadPool {

# workers: Vec<Worker>,

# sender: mpsc::Sender<Job>,

# }

#

# type Job = Box<dyn FnOnce() + Send + 'static>;

#

# impl ThreadPool {

# /// Create a new ThreadPool.

# ///

# /// The size is the number of threads in the pool.

# ///

# /// # Panics

# ///

# /// The `new` function will panic if the size is zero.

# pub fn new(size: usize) -> ThreadPool {

# assert!(size > 0);

#

# let (sender, receiver) = mpsc::channel();

#

# let receiver = Arc::new(Mutex::new(receiver));

#

# let mut workers = Vec::with_capacity(size);

#

# for id in 0..size {

# workers.push(Worker::new(id, Arc::clone(&receiver)));

# }

#

# ThreadPool { workers, sender }

# }

#

# pub fn execute<F>(&self, f: F)

# where

# F: FnOnce() + Send + 'static,

# {

# let job = Box::new(f);

#

# self.sender.send(job).unwrap();

# }

# }

#

# impl Drop for ThreadPool {

# fn drop(&mut self) {

# for worker in &mut self.workers {

# println!("Shutting down worker {}", worker.id);

#

# worker.thread.join().unwrap();

# }

# }

# }

20.3. 优雅停机与清理 561

#

# struct Worker {

# id: usize,

# thread: Option<thread::JoinHandle<()>>,

# }

#

impl Worker {

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {

// --snip--

# let thread = thread::spawn(move || loop {

# let job = receiver.lock().unwrap().recv().unwrap();

#

# println!("Worker {} got a job; executing.", id);

#

# job();

# });

#

Worker {

id,

thread: Some(thread),

}

}

}

第一个错误位于 Drop 实现中。之前提到过要调用 Option 上的 take 将 thread 移动出 worker。如下改

变会修复问题:

文件名: src∕lib.rs

# use std::sync::mpsc;

# use std::sync::Arc;

# use std::sync::Mutex;

# use std::thread;

#

# pub struct ThreadPool {

# workers: Vec<Worker>,

# sender: mpsc::Sender<Job>,

# }

#

# type Job = Box<dyn FnOnce() + Send + 'static>;

#

# impl ThreadPool {

# /// Create a new ThreadPool.

# ///

# /// The size is the number of threads in the pool.

# ///

# /// # Panics

# ///

# /// The `new` function will panic if the size is zero.

# pub fn new(size: usize) -> ThreadPool {

# assert!(size > 0);

#

# let (sender, receiver) = mpsc::channel();

#

# let receiver = Arc::new(Mutex::new(receiver));

#

# let mut workers = Vec::with_capacity(size);

562 CHAPTER 20. 最后的项目: 构建多线程 WEB SERVER

#

# for id in 0..size {

# workers.push(Worker::new(id, Arc::clone(&receiver)));

# }

#

# ThreadPool { workers, sender }

# }

#

# pub fn execute<F>(&self, f: F)

# where

# F: FnOnce() + Send + 'static,

# {

# let job = Box::new(f);

#

# self.sender.send(job).unwrap();

# }

# }

#

impl Drop for ThreadPool {

fn drop(&mut self) {

for worker in &mut self.workers {

println!("Shutting down worker {}", worker.id);

if let Some(thread) = worker.thread.take() {

thread.join().unwrap();

}

}

}

}

#

# struct Worker {

# id: usize,

# thread: Option<thread::JoinHandle<()>>,

# }

#

# impl Worker {

# fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {

# let thread = thread::spawn(move || loop {

# let job = receiver.lock().unwrap().recv().unwrap();

#

# println!("Worker {} got a job; executing.", id);

#

# job();

# });

#

# Worker {

# id,

# thread: Some(thread),

# }

# }

# }

如第十七章我们见过的,Option 上的 take 方法会取出 Some 而留下 None。使用 if let 解构 Some 并

得到线程,接着在线程上调用 join。如果 worker 的线程已然是 None,就知道此时这个 worker 已经清

理了其线程所以无需做任何操作。

举报

相关推荐

0 条评论