Завершение и очистка
Код в Листинге 21-20 отвечает на запросы асинхронно с использованием пула потоков, как мы и планировали. Мы получаем некоторые предупреждения о полях workers, id и thread, которые мы не используем напрямую, что напоминает нам, что мы ничего не очищаем. Когда мы используем менее элегантный метод Ctrl-C для остановки основного потока, все остальные потоки также немедленно останавливаются, даже если они находятся в процессе обслуживания запроса.
Далее мы реализуем трейт Drop для вызова join на каждом потоке в пуле, чтобы они могли завершить запросы, над которыми работают, перед закрытием. Затем мы реализуем способ сообщить потокам, что они должны прекратить принимать новые запросы и завершиться. Чтобы увидеть этот код в действии, мы изменим наш сервер так, чтобы он принимал только два запроса перед грациозным завершением работы своего пула потоков.
Одна вещь, которую стоит заметить: ничто из этого не влияет на части кода, которые обрабатывают выполнение замыканий, поэтому всё здесь было бы таким же, если бы мы использовали пул потоков для асинхронной среды выполнения.
Реализация трейта Drop для ThreadPool
Давайте начнём с реализации Drop для нашего пула потоков. Когда пул уничтожается, все наши потоки должны присоединиться (join), чтобы убедиться, что они завершают свою работу. Листинг 21-22 показывает первую попытку реализации Drop; этот код пока не будет работать.
Файл: src/lib.rs
#![allow(unused)] fn main() { [Этот код не компилируется!] impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } }
Листинг 21-22: Присоединение к каждому потоку при выходе пула потоков из области видимости
Сначала мы проходим в цикле по каждому работнику пула потоков. Мы используем &mut для этого, потому что self является изменяемой ссылкой, и нам также нужно иметь возможность изменять worker. Для каждого работника мы печатаем сообщение о том, что этот конкретный экземпляр Worker завершает работу, а затем вызываем join для потока этого экземпляра Worker. Если вызов join завершается неудачей, мы используем unwrap, чтобы Rust запаниковал и перешёл к неграциозному завершению.
Вот ошибка, которую мы получаем при компиляции этого кода:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
Ошибка сообщает нам, что мы не можем вызвать join, потому что у нас есть только изменяемое заимствование каждого работника, а join принимает владение своим аргументом. Чтобы решить эту проблему, нам нужно переместить поток из экземпляра Worker, который владеет thread, чтобы join мог потребить поток. Один из способов сделать это — использовать тот же подход, который мы использовали в Листинге 18-15. Если бы Worker содержал Option<thread::JoinHandle<()>>, мы могли бы вызвать метод take на Option, чтобы переместить значение из варианта Some и оставить вариант None на его месте. Другими словами, работающий Worker имел бы вариант Some в thread, а когда мы хотели бы очистить Worker, мы заменили бы Some на None, чтобы у Worker не было потока для выполнения.
Однако единственный раз, когда это потребовалось бы, — это при уничтожении Worker. В обмен нам пришлось бы иметь дело с Option<thread::JoinHandle<()>> везде, где мы обращаемся к worker.thread. В идиоматичном Rust Option используется довольно часто, но когда вы обнаруживаете, что оборачиваете что-то, что, как вы знаете, всегда будет присутствовать, в Option в качестве обходного пути, как в этом случае, стоит поискать альтернативные подходы, чтобы сделать ваш код чище и менее подверженным ошибкам.
В данном случае существует лучшая альтернатива: метод Vec::drain. Он принимает параметр диапазона, чтобы указать, какие элементы удалить из вектора, и возвращает итератор этих элементов. Передача синтаксиса диапазона .. удалит все значения из вектора.
Итак, нам нужно обновить реализацию drop для ThreadPool следующим образом:
Файл: src/lib.rs
#![allow(unused)] fn main() { impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } }
Это решает ошибку компилятора и не требует никаких других изменений в нашем коде. Обратите внимание, что поскольку drop может быть вызван при панике, unwrap также может вызвать панику и привести к двойной панике, что немедленно аварийно завершает программу и прерывает любую выполняющуюся очистку. Это приемлемо для примерной программы, но не рекомендуется для производственного кода.
Сигнализация потокам о прекращении прослушивания заданий
Со всеми внесёнными нами изменениями наш код компилируется без каких-либо предупреждений. Однако плохая новость заключается в том, что этот код пока не функционирует так, как мы хотим. Ключевым моментом является логика в замыканиях, выполняемых потоками экземпляров Worker: в настоящее время мы вызываем join, но это не завершит потоки, потому что они работают в бесконечном цикле, ищущем задания. Если мы попытаемся уничтожить наш ThreadPool с текущей реализацией drop, основной поток заблокируется навсегда, ожидая завершения первого потока.
Чтобы исправить эту проблему, нам нужно изменить реализацию drop для ThreadPool, а затем изменить цикл Worker.
Сначала мы изменим реализацию drop для ThreadPool, чтобы явно уничтожить sender перед ожиданием завершения потоков. Листинг 21-23 показывает изменения в ThreadPool для явного уничтожения sender. В отличие от thread, здесь нам действительно нужно использовать Option, чтобы иметь возможность переместить sender из ThreadPool с помощью Option::take.
Файл: src/lib.rs
#![allow(unused)] fn main() { [Этот код не производит желаемого поведения.] pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --пропуск-- impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // --пропуск-- ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } }
Листинг 21-23: Явное уничтожение sender перед присоединением к потокам Worker
Уничтожение sender закрывает канал, что указывает на то, что больше сообщений отправляться не будет. Когда это происходит, все вызовы recv, которые экземпляры Worker делают в бесконечном цикле, будут возвращать ошибку. В Листинге 21-24 мы изменяем цикл Worker для грациозного выхода из цикла в этом случае, что означает, что потоки завершатся, когда реализация drop для ThreadPool вызовет join для них.
Файл: src/lib.rs
#![allow(unused)] fn main() { impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } }); Worker { id, thread } } } }
Листинг 21-24: Явный выход из цикла при возврате ошибки recv
Чтобы увидеть этот код в действии, давайте изменим main для принятия только двух запросов перед грациозным завершением работы сервера, как показано в Листинге 21-25.
Файл: src/main.rs
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }
Листинг 21-25: Завершение работы сервера после обслуживания двух запросов путём выхода из цикла
Вы не хотели бы, чтобы реальный веб-сервер завершал работу после обслуживания всего двух запросов. Этот код просто демонстрирует, что грациозное завершение работы и очистка функционируют правильно.
Метод take определён в трейте Iterator и ограничивает итерацию максимум первыми двумя элементами. ThreadPool выйдет из области видимости в конце main, и реализация drop будет выполнена.
Запустите сервер с помощью cargo run и сделайте три запроса. Третий запрос должен завершиться ошибкой, и в вашем терминале вы должны увидеть вывод, похожий на этот:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Вы можете увидеть другой порядок идентификаторов Worker и напечатанных сообщений. Мы можем увидеть, как работает этот код, из сообщений: экземпляры Worker 0 и 3 получили первые два запроса. Сервер перестал принимать соединения после второго соединения, и реализация Drop для ThreadPool начала выполняться до того, как Worker 3 даже начал свою работу. Уничтожение sender отключает все экземпляры Worker и сообщает им о необходимости завершить работу. Экземпляры Worker каждый печатают сообщение при отключении, а затем пул потоков вызывает join для ожидания завершения каждого потока Worker.
Обратите внимание на один интересный аспект этого конкретного выполнения: ThreadPool уничтожил sender, и до того, как какой-либо Worker получил ошибку, мы попытались присоединиться к Worker 0. Worker 0 ещё не получил ошибку от recv, поэтому основной поток заблокировался, ожидая завершения Worker 0. Тем временем Worker 3 получил задание, а затем все потоки получили ошибку. Когда Worker 0 завершился, основной поток ждал завершения остальных экземпляров Worker. В тот момент они все вышли из своих циклов и остановились.
Поздравляем! Мы завершили наш проект; у нас есть базовый веб-сервер, который использует пул потоков для асинхронного ответа. Мы можем выполнить грациозное завершение работы сервера, которое очищает все потоки в пуле.
Вот полный код для справки:
Файл: src/main.rs
use hello::ThreadPool; use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Файл: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Создаёт новый ThreadPool. /// /// Параметр size - количество потоков в пуле. /// /// # Паника /// /// Функция `new` будет паниковать, если size равен нулю. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } }); Worker { id, thread: Some(thread), } } } }
Мы могли бы сделать больше! Если вы хотите продолжить улучшение этого проекта, вот несколько идей:
- Добавьте больше документации к
ThreadPoolи его публичным методам - Добавьте тесты функциональности библиотеки
- Замените вызовы
unwrapна более надёжную обработку ошибок - Используйте
ThreadPoolдля выполнения некоторых задач, отличных от обслуживания веб-запросов - Найдите крейт пула потоков на crates.io и реализуйте аналогичный веб-сервер с использованием этого крейта. Затем сравните его API и надёжность с реализованным нами пулом потоков
Итог
Отлично сработано! Вы дошли до конца книги! Мы хотим поблагодарить вас за то, что присоединились к нам в этом туре по Rust. Теперь вы готовы реализовывать свои собственные проекты на Rust и помогать с проектами других людей. Помните, что существует welcoming сообщество других Rustaceans, которые будут рады помочь вам с любыми трудностями, с которыми вы столкнётесь на вашем пути в Rust.