От однопоточного к многопоточному серверу
В настоящее время сервер обрабатывает каждый запрос последовательно, что означает, что он не будет обрабатывать второе соединение, пока не закончится обработка первого. Если сервер получает всё больше и больше запросов, такое последовательное выполнение становится всё менее оптимальным. Если сервер получает запрос, обработка которого занимает много времени, последующие запросы должны будут ждать, пока длинный запрос не завершится, даже если новые запросы могут быть обработаны быстро. Нам нужно это исправить, но сначала мы посмотрим на проблему в действии.
Имитация медленного запроса
Мы посмотрим, как медленно обрабатываемый запрос может повлиять на другие запросы в нашей текущей реализации сервера. Листинг 21-10 реализует обработку запроса к /sleep с имитацией медленного ответа, который заставит сервер спать в течение пяти секунд перед ответом.
Файл: src/main.rs
#![allow(unused)] fn main() { use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --пропуск-- fn handle_connection(mut stream: TcpStream) { // --пропуск-- let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --пропуск-- } }
Листинг 21-10: Имитация медленного запроса с помощью задержки на пять секунд
Мы перешли от if к match, так как теперь у нас три случая. Нам нужно явно сопоставить срез request_line со строковыми литеральными значениями; match не выполняет автоматического взятия ссылки и разыменования, как это делает метод равенства.
Первая ветка такая же, как блок if из Листинга 21-9. Вторая ветка соответствует запросу к /sleep. Когда этот запрос получен, сервер будет спать в течение пяти секунд перед отображением успешной HTML-страницы. Третья ветка такая же, как блок else из Листинга 21-9.
Вы можете видеть, насколько примитивен наш сервер: реальные библиотеки обрабатывали бы распознавание нескольких запросов гораздо менее многословным способом!
Запустите сервер с помощью cargo run. Затем откройте два окна браузера: одно для http://127.0.0.1:7878 и другое для http://127.0.0.1:7878/sleep. Если вы введёте URI / несколько раз, как и раньше, вы увидите, что он быстро отвечает. Но если вы введёте /sleep и затем загрузите /, вы увидите, что / ждёт, пока sleep не проспит свои полные пять секунд, прежде чем загрузиться.
Существует несколько техник, которые мы могли бы использовать, чтобы избежать накопления запросов из-за медленного запроса, включая использование async, как мы делали в Главе 17; та, которую мы реализуем — это пул потоков.
Улучшение пропускной способности с помощью пула потоков
Пул потоков — это группа заранее созданных потоков, которые готовы и ожидают обработки задачи. Когда программа получает новую задачу, она назначает один из потоков в пуле для этой задачи, и этот поток будет обрабатывать задачу. Оставшиеся потоки в пуле доступны для обработки любых других задач, которые поступают, пока первый поток обрабатывает свою задачу. Когда первый поток завершает обработку своей задачи, он возвращается в пул свободных потоков, готовый обработать новую задачу. Пул потоков позволяет обрабатывать соединения параллельно, увеличивая пропускную способность вашего сервера.
Мы ограничим количество потоков в пуле небольшим числом, чтобы защититься от DoS-атак; если бы наша программа создавала новый поток для каждого поступающего запроса, кто-то, делая 10 миллионов запросов к нашему серверу, мог бы устроить хаос, исчерпав все ресурсы нашего сервера и полностью остановив обработку запросов.
Вместо создания неограниченного количества потоков мы будем иметь фиксированное количество потоков, ожидающих в пуле. Поступающие запросы отправляются в пул для обработки. Пул будет поддерживать очередь входящих запросов. Каждый из потоков в пуле будет брать запрос из этой очереди, обрабатывать его, а затем запрашивать следующий запрос из очереди. При такой конструкции мы можем обрабатывать до N запросов одновременно, где N — количество потоков. Если каждый поток обрабатывает долгий запрос, последующие запросы всё равно могут накапливаться в очереди, но мы увеличили количество долгих запросов, которые можем обработать до достижения этой точки.
Эта техника — лишь один из многих способов улучшить пропускную способность веб-сервера. Другие варианты, которые вы можете исследовать, — это модель fork/join, однопоточная модель асинхронного I/O и многопоточная модель асинхронного I/O. Если вам интересна эта тема, вы можете почитать о других решениях и попробовать реализовать их; с низкоуровневым языком вроде Rust все эти варианты возможны.
Прежде чем мы начнём реализацию пула потоков, давайте обсудим, как должно выглядеть использование пула. Когда вы пытаетесь спроектировать код, написание клиентского интерфейса сначала может помочь направлять ваш дизайн. Напишите API кода так, чтобы он был структурирован так, как вы хотите его вызывать; затем реализуйте функциональность внутри этой структуры, вместо того чтобы реализовывать функциональность и затем проектировать публичный API.
Подобно тому, как мы использовали разработку через тестирование в проекте в Главе 12, мы здесь используем разработку через компилятор. Мы напишем код, который вызывает функции, которые мы хотим, а затем посмотрим на ошибки от компилятора, чтобы определить, что нам следует изменить дальше, чтобы код заработал. Однако прежде чем мы сделаем это, мы исследуем технику, которую не будем использовать в качестве отправной точки.
Создание потока для каждого запроса
Сначала давайте исследуем, как мог бы выглядеть наш код, если бы он создавал новый поток для каждого соединения. Как упоминалось ранее, это не наш окончательный план из-за проблем с потенциальным созданием неограниченного количества потоков, но это отправная точка, чтобы сначала получить работающий многопоточный сервер. Затем мы добавим пул потоков как улучшение, и сравнение двух решений будет проще.
Листинг 21-11 показывает изменения, которые нужно внести в main, чтобы создавать новый поток для обработки каждого потока внутри цикла for.
Файл: src/main.rs
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
Листинг 21-11: Создание нового потока для каждого потока соединения
Как вы узнали в Главе 16, thread::spawn создаст новый поток, а затем запустит код в замыкании в новом потоке. Если вы запустите этот код и загрузите /sleep в вашем браузере, а затем / в двух дополнительных вкладках браузера, вы действительно увидите, что запросы к / не должны ждать завершения /sleep. Однако, как мы упоминали, это в конечном итоге перегрузит систему, потому что вы создаёте новые потоки без каких-либо ограничений.
Вы также можете вспомнить из Главы 17, что это именно та ситуация, где async и await действительно сияют! Держите это в уме, пока мы создаём пул потоков, и подумайте о том, как всё будет выглядеть по-другому или так же с использованием async.
Создание ограниченного количества потоков
Мы хотим, чтобы наш пул потоков работал похожим, знакомым образом, чтобы переход от потоков к пулу потоков не требовал больших изменений в коде, использующем наш API. Листинг 21-12 показывает гипотетический интерфейс для структуры ThreadPool, который мы хотим использовать вместо thread::spawn.
Файл: src/main.rs
[Этот код не компилируется!] fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
Листинг 21-12: Наш идеальный интерфейс ThreadPool
Мы используем ThreadPool::new для создания нового пула потоков с настраиваемым количеством потоков, в данном случае четыре. Затем в цикле for pool.execute имеет интерфейс, похожий на thread::spawn, в том смысле, что он принимает замыкание, которое пул должен выполнить для каждого потока. Нам нужно реализовать pool.execute так, чтобы он принимал замыкание и передавал его потоку в пуле для выполнения. Этот код пока не будет компилироваться, но мы попробуем, чтобы компилятор мог направить нас в том, как это исправить.
Создание ThreadPool с помощью разработки через компилятор
Внесите изменения из Листинга 21-12 в src/main.rs, и затем давайте используем ошибки компилятора из cargo check для управления нашей разработкой. Вот первая ошибка, которую мы получаем:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Отлично! Эта ошибка говорит нам, что нам нужен тип или модуль ThreadPool, поэтому мы создадим его сейчас. Наша реализация ThreadPool будет независимой от вида работы, которую выполняет наш веб-сервер. Поэтому давайте переключим крейт hello из бинарного крейта в библиотечный крейт, чтобы разместить нашу реализацию ThreadPool. После изменения на библиотечный крейт мы также могли бы использовать отдельную библиотеку пула потоков для любой работы, которую мы хотим выполнять с использованием пула потоков, а не только для обслуживания веб-запросов.
Создайте файл src/lib.rs, который содержит следующее — это простейшее определение структуры ThreadPool, которое мы можем иметь на данный момент:
Файл: src/lib.rs
#![allow(unused)] fn main() { pub struct ThreadPool; }
Затем отредактируйте файл main.rs, чтобы включить ThreadPool в область видимости из библиотечного крейта, добавив следующий код в начало src/main.rs:
Файл: src/main.rs
#![allow(unused)] fn main() { use hello::ThreadPool; }
Этот код всё ещё не будет работать, но давайте проверим его снова, чтобы получить следующую ошибку, которую нам нужно решить:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Эта ошибка указывает, что далее нам нужно создать ассоциированную функцию с именем new для ThreadPool. Мы также знаем, что new должен иметь один параметр, который может принимать 4 в качестве аргумента, и должен возвращать экземпляр ThreadPool. Давайте реализуем простейшую функцию new, которая будет иметь эти характеристики:
Файл: src/lib.rs
#![allow(unused)] fn main() { pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } } }
Мы выбрали usize в качестве типа параметра size, потому что знаем, что отрицательное количество потоков не имеет смысла. Мы также знаем, что будем использовать это 4 как количество элементов в коллекции потоков, для чего и предназначен тип usize, как обсуждалось в разделе "Целочисленные типы" в Главе 3.
Давайте проверим код снова:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Теперь ошибка возникает потому, что у нас нет метода execute у ThreadPool. Вспомните из раздела "Создание ограниченного количества потоков", что мы решили, что наш пул потоков должен иметь интерфейс, похожий на thread::spawn. Кроме того, мы реализуем функцию execute так, чтобы она принимала переданное ей замыкание и передавала его свободному потоку в пуле для выполнения.
Мы определим метод execute для ThreadPool, чтобы принимать замыкание в качестве параметра. Вспомните из раздела "Перемещение захваченных значений из замыканий" в Главе 13, что мы можем принимать замыкания в качестве параметров с тремя разными трейтами: Fn, FnMut и FnOnce. Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном итоге мы будем делать что-то похожее на реализацию thread::spawn из стандартной библиотеки, поэтому мы можем посмотреть, какие ограничения имеет сигнатура thread::spawn на её параметр. Документация показывает нам следующее:
#![allow(unused)] fn main() { pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, }
Параметр типа F — это тот, который нас здесь интересует; параметр типа T связан с возвращаемым значением, и он нас не интересует. Мы видим, что spawn использует FnOnce как ограничение трейта для F. Это, вероятно, то, что мы тоже хотим, потому что мы в конечном итоге передадим аргумент, который мы получаем в execute, в spawn. Мы можем быть ещё более уверены, что FnOnce — это трейт, который мы хотим использовать, потому что поток для выполнения запроса будет выполнять замыкание этого запроса только один раз, что соответствует Once в FnOnce.
Параметр типа F также имеет ограничение трейта Send и ограничение времени жизни 'static, которые полезны в нашей ситуации: нам нужен Send для передачи замыкания из одного потока в другой и 'static, потому что мы не знаем, сколько времени потоку потребуется для выполнения. Давайте создадим метод execute для ThreadPool, который будет принимать обобщённый параметр типа F с этими ограничениями:
Файл: src/lib.rs
#![allow(unused)] fn main() { impl ThreadPool { // --пропуск-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } }
Мы всё ещё используем () после FnOnce, потому что этот FnOnce представляет замыкание, которое не принимает параметров и возвращает тип unit (). Как и в определениях функций, тип возвращаемого значения может быть опущен в сигнатуре, но даже если у нас нет параметров, нам всё ещё нужны круглые скобки.
Опять же, это простейшая реализация метода execute: он ничего не делает, но мы только пытаемся заставить наш код скомпилироваться. Давайте проверим его снова:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Он компилируется! Но обратите внимание, что если вы попробуете cargo run и сделаете запрос в браузере, вы увидите в браузере те же ошибки, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute!
Примечание: Вы можете слышать высказывание о языках со строгими компиляторами, таких как Haskell и Rust: «Если код компилируется, он работает». Но это высказывание не всегда верно. Наш проект компилируется, но он абсолютно ничего не делает! Если бы мы строили реальный, законченный проект, сейчас было бы хорошее время начать писать модульные тесты, чтобы проверить, что код компилируется и имеет поведение, которое мы хотим.
Вопрос для размышления: Что бы здесь изменилось, если бы мы собирались выполнить future (футуру) вместо замыкания?
Проверка количества потоков в new
Мы ничего не делаем с параметрами для new и execute. Давайте реализуем тела этих функций с тем поведением, которое мы хотим. Для начала давайте подумаем о new. Ранее мы выбрали беззнаковый тип для параметра size, потому что пул с отрицательным количеством потоков не имеет смысла. Однако пул с нулём потоков также не имеет смысла, но ноль является совершенно допустимым usize. Мы добавим код для проверки, что size больше нуля, прежде чем возвращать экземпляр ThreadPool, и заставим программу паниковать, если она получает ноль, с помощью макроса assert!, как показано в Листинге 21-13.
Файл: src/lib.rs
#![allow(unused)] fn main() { impl ThreadPool { /// Создаёт новый ThreadPool. /// /// Параметр size - количество потоков в пуле. /// /// # Паника /// /// Функция `new` будет паниковать, если size равен нулю. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --пропуск-- } }
Листинг 21-13: Реализация ThreadPool::new с паникой при size = 0
Мы также добавили некоторую документацию для нашего ThreadPool с помощью комментарией документации. Обратите внимание, что мы следовали хорошим практикам документации, добавив раздел, который указывает ситуации, в которых наша функция может паниковать, как обсуждалось в Главе 14. Попробуйте запустить cargo doc --open и нажать на структуру ThreadPool, чтобы посмотреть, как сгенерированная документация для new выглядит!
Вместо добавления макроса assert!, как мы сделали здесь, мы могли бы изменить new на build и возвращать Result, как мы делали с Config::build в I/O-проекте в Листинге 12-9. Но мы решили, что в данном случае попытка создать пул потоков без каких-либо потоков должна быть неустранимой ошибкой. Если вы чувствуете амбиции, попробуйте написать функцию с именем build со следующей сигнатурой для сравнения с функцией new:
#![allow(unused)] fn main() { pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> { }
Создание места для хранения потоков
Теперь, когда у нас есть способ узнать, что у нас есть допустимое количество потоков для хранения в пуле, мы можем создать эти потоки и сохранить их в структуре ThreadPool перед возвратом структуры. Но как мы "храним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn:
#![allow(unused)] fn main() { pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, }
Функция spawn возвращает JoinHandle<T>, где T — это тип, который возвращает замыкание. Давайте попробуем тоже использовать JoinHandle и посмотрим, что произойдёт. В нашем случае замыкания, которые мы передаём в пул потоков, будут обрабатывать соединение и ничего не возвращать, поэтому T будет типом unit ().
Код в Листинге 21-14 будет компилироваться, но он пока не создаёт никаких потоков. Мы изменили определение ThreadPool, чтобы он содержал вектор экземпляров thread::JoinHandle<()>, инициализировали вектор с вместимостью size, настроили цикл for, который будет запускать некоторый код для создания потоков, и вернули экземпляр ThreadPool, содержащий их.
Файл: src/lib.rs
#![allow(unused)] fn main() { [Этот код не производит желаемого поведения.] use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { // --пропуск-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // создаём некоторые потоки и сохраняем их в векторе } ThreadPool { threads } } // --пропуск-- } }
Листинг 21-14: Создание вектора для ThreadPool для хранения потоков
Мы добавили std::thread в область видимости в библиотечном крейте, потому что мы используем thread::JoinHandle в качестве типа элементов в векторе в ThreadPool.
После получения допустимого размера наш ThreadPool создаёт новый вектор, который может содержать size элементов. Функция with_capacity выполняет ту же задачу, что и Vec::new, но с важным отличием: она предварительно выделяет место в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, выполнение этого выделения заранее немного более эффективно, чем использование Vec::new, который изменяет свой размер по мере вставки элементов.
Когда вы снова запустите cargo check, он должен завершиться успешно.
Отправка кода из ThreadPool в поток
Мы оставили комментарий в цикле for в Листинге 21-14 относительно создания потоков. Здесь мы рассмотрим, как мы фактически создаём потоки. Стандартная библиотека предоставляет thread::spawn как способ создания потоков, и thread::spawn ожидает получить некоторый код, который поток должен выполнить сразу после создания. Однако в нашем случае мы хотим создать потоки и заставить их ждать код, который мы отправим позже. Реализация потоков в стандартной библиотеке не включает никакого способа сделать это; мы должны реализовать это вручную.
Мы реализуем это поведение, введя новую структуру данных между ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру данных Worker (Работник), что является распространённым термином в реализациях пулов. Worker получает код, который нужно выполнить, и выполняет этот код в своём потоке.
Представьте себе людей, работающих на кухне ресторана: работники ждут, пока поступят заказы от клиентов, а затем они отвечают за принятие этих заказов и их выполнение.
Вместо хранения вектора экземпляров JoinHandle<()> в пуле потоков мы будем хранить экземпляры структуры Worker. Каждый Worker будет хранить один экземпляр JoinHandle<()>. Затем мы реализуем метод для Worker, который будет принимать замыкание кода для выполнения и отправлять его уже запущенному потоку для выполнения. Мы также дадим каждому Worker идентификатор (id), чтобы мы могли различать различные экземпляры Worker в пуле при логировании или отладке.
Вот новый процесс, который будет происходить при создании ThreadPool. Мы реализуем код, который отправляет замыкание в поток, после того как настроим Worker таким образом:
- Определим структуру
Worker, которая содержитidиJoinHandle<()> - Изменим
ThreadPoolдля хранения вектора экземпляровWorker - Определим функцию
Worker::new, которая принимает номерidи возвращает экземплярWorker, содержащийidи поток, созданный с пустым замыканием - В
ThreadPool::newиспользуем счётчик циклаforдля генерацииid, создадим новогоWorkerс этимidи сохранимWorkerв векторе
Если вы готовы к challenge, попробуйте реализовать эти изменения самостоятельно, прежде чем смотреть на код в Листинге 21-15.
Готовы? Вот Листинг 21-15 с одним из способов внесения указанных изменений.
Файл: src/lib.rs
#![allow(unused)] fn main() { use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --пропуск-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --пропуск-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } } }
Листинг 21-15: Изменение ThreadPool для хранения экземпляров Worker вместо прямого хранения потоков
Мы изменили имя поля в ThreadPool с threads на workers, потому что теперь он хранит экземпляры Worker вместо экземпляров JoinHandle<()>. Мы используем счётчик в цикле for в качестве аргумента для Worker::new и сохраняем каждого нового Worker в векторе с именем workers.
Внешний код (например, наш сервер в src/main.rs) не должен знать детали реализации относительно использования структуры Worker внутри ThreadPool, поэтому мы делаем структуру Worker и её функцию new приватными. Функция Worker::new использует переданный ей id и сохраняет экземпляр JoinHandle<()>, который создаётся путём порождения нового потока с использованием пустого замыкания.
Примечание: Если операционная система не может создать поток из-за недостатка системных ресурсов, thread::spawn вызовет панику. Это вызовет панику всего нашего сервера, даже если создание некоторых потоков могло бы завершиться успешно. Для простоты такое поведение приемлемо, но в производственной реализации пула потоков вы, вероятно, захотите использовать std::thread::Builder и его метод spawn, который возвращает Result.
Этот код будет компилироваться и сохранять количество экземпляров Worker, которое мы указали в качестве аргумента для ThreadPool::new. Но мы всё ещё не обрабатываем замыкание, которое получаем в execute. Давайте посмотрим, как это сделать дальше.
Отправка запросов в потоки через каналы
Следующая проблема, которую мы решим, заключается в том, что замыкания, переданные в thread::spawn, абсолютно ничего не делают. В настоящее время мы получаем замыкание, которое хотим выполнить, в методе execute. Но нам нужно передать thread::spawn замыкание для выполнения при создании каждого Worker во время создания ThreadPool.
Мы хотим, чтобы созданные нами структуры Worker получали код для выполнения из очереди, хранящейся в ThreadPool, и отправляли этот код в свой поток для выполнения.
Каналы, которые мы изучили в Главе 16 — простой способ общения между двумя потоками — идеально подходят для этого случая использования. Мы будем использовать канал в качестве очереди заданий, и execute будет отправлять задание из ThreadPool в экземпляры Worker, которые будут отправлять задание в свой поток. Вот план:
ThreadPoolсоздаст канал и будет хранить отправитель (sender)- Каждый
Workerбудет хранить получатель (receiver) - Мы создадим новую структуру
Job, которая будет содержать замыкания, которые мы хотим отправить через канал - Метод
executeбудет отправлять задание, которое он хочет выполнить, через отправитель - В своём потоке
Workerбудет циклически опрашивать свой получатель и выполнять замыкания любых полученных заданий
Давайте начнём с создания канала в ThreadPool::new и хранения отправителя в экземпляре ThreadPool, как показано в Листинге 21-16. Структура Job пока ничего не содержит, но будет типом элемента, который мы отправляем через канал.
Файл: src/lib.rs
#![allow(unused)] fn main() { use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --пропуск-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --пропуск-- } }
Листинг 21-16: Изменение ThreadPool для хранения отправителя канала, передающего экземпляры Job
В ThreadPool::new мы создаём наш новый канал и заставляем пул хранить отправитель. Этот код успешно скомпилируется.
Давайте попробуем передать получатель канала в каждый Worker при создании пула потоков. Мы знаем, что хотим использовать получатель в потоке, который создают экземпляры Worker, поэтому мы будем ссылаться на параметр receiver в замыкании. Код в Листинге 21-17 пока не будет компилироваться.
Файл: src/lib.rs
#![allow(unused)] fn main() { [Этот код не компилируется!] impl ThreadPool { // --пропуск-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --пропуск-- } // --пропуск-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } } }
Листинг 21-17: Передача получателя каждому Worker
Мы сделали небольшие и простые изменения: мы передаём receiver в Worker::new, а затем используем его внутри замыкания.
Когда мы пытаемся проверить этот код, мы получаем следующую ошибку:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
Код пытается передать receiver нескольким экземплярам Worker. Это не сработает, как вы помните из Главы 16: реализация канала, предоставляемая Rust, имеет тип "множество производителей, один потребитель". Это означает, что мы не можем просто клонировать потребительский конец канала, чтобы исправить этот код. Мы также не хотим отправлять сообщение несколько раз нескольким потребителям; мы хотим один список сообщений с несколькими экземплярами Worker, чтобы каждое сообщение обрабатывалось один раз.
Кроме того, извлечение задания из очереди канала involves изменение receiver, поэтому потокам нужен безопасный способ совместного использования и изменения receiver; в противном случае мы можем получить состояние гонки (как рассматривалось в Главе 16).
Вспомните потокобезопасные умные указатели, обсуждавшиеся в Главе 16: чтобы разделить владение между несколькими потоками и позволить потокам изменять значение, нам нужно использовать Arc<Mutex<T>>. Тип Arc позволит нескольким экземплярам Worker владеть receiver, а Mutex гарантирует, что только один Worker получает задание из receiver в любой момент времени. Листинг 21-18 показывает изменения, которые нам нужно сделать.
Файл: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; // --пропуск-- impl ThreadPool { // --пропуск-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --пропуск-- } // --пропуск-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --пропуск-- } } }
Листинг 21-18: Совместное использование получателя среди экземпляров Worker с помощью Arc и Mutex
В ThreadPool::new мы помещаем receiver в Arc и Mutex. Для каждого нового Worker мы клонируем Arc, чтобы увеличить счётчик ссылок, чтобы экземпляры Worker могли совместно владеть receiver.
С этими изменениями код компилируется! Мы почти у цели!
Реализация метода execute
Давайте наконец реализуем метод execute для ThreadPool. Мы также изменим Job из структуры на псевдоним типа для типажа-объекта, который содержит тип замыкания, получаемого execute. Как обсуждалось в разделе "Синонимы типов и псевдонимы типов" в Главе 20, псевдонимы типов позволяют нам делать длинные типы короче для удобства использования. Посмотрите на Листинг 21-19.
Файл: src/lib.rs
#![allow(unused)] fn main() { // --пропуск-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --пропуск-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --пропуск-- }
Листинг 21-19: Создание псевдонима типа Job для Box, содержащего каждое замыкание, и отправка задания через канал
После создания нового экземпляра Job с использованием замыкания, которое мы получаем в execute, мы отправляем это задание через передающий конец канала. Мы вызываем unwrap на send на случай, если отправка не удастся. Это может произойти, если, например, мы остановим выполнение всех наших потоков, что означает, что принимающий конец перестал получать новые сообщения. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap, заключается в том, что мы знаем, что случай неудачи не произойдёт, но компилятор этого не знает.
Но мы ещё не совсем закончили! В Worker наше замыкание, передаваемое в thread::spawn, всё ещё только ссылается на принимающий конец канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающего конца канала и выполняя задание, когда оно его получает. Давайте внесём изменение, показанное в Листинге 21-20, в Worker::new.
Файл: src/lib.rs
#![allow(unused)] fn main() { // --пропуск-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
Листинг 21-20: Получение и выполнение заданий в потоке экземпляра Worker
Здесь мы сначала вызываем lock на receiver, чтобы получить мьютекс, а затем вызываем unwrap для паники при любых ошибках. Получение блокировки может завершиться неудачей, если мьютекс находится в "отравленном" состоянии, что может произойти, если какой-либо другой поток запаниковал, удерживая блокировку, вместо того чтобы освободить её. В этой ситуации вызов unwrap для паники этого потока является правильным действием. Не стесняйтесь заменить этот unwrap на expect с сообщением об ошибке, которое имеет для вас смысл.
Если мы получаем блокировку мьютекса, мы вызываем recv, чтобы получить Job из канала. Последний unwrap также игнорирует любые ошибки здесь, которые могут возникнуть, если поток, удерживающий отправитель, завершил работу, аналогично тому, как метод send возвращает Err, если получатель завершает работу.
Вызов recv блокируется, поэтому если задания ещё нет, текущий поток будет ждать, пока задание не станет доступным. Mutex<T> гарантирует, что только один поток Worker в данный момент пытается запросить задание.
Наш пул потоков теперь находится в рабочем состоянии! Запустите его с помощью cargo run и сделайте несколько запросов:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Успех! Теперь у нас есть пул потоков, который асинхронно выполняет соединения. Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получит много запросов. Если мы сделаем запрос к /sleep, сервер сможет обслуживать другие запросы, поручив их выполнение другому потоку.
Примечание: Если вы откроете /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному с интервалом в пять секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно по причинам кэширования. Это ограничение не вызвано нашим веб-сервером.
Это хорошее время, чтобы остановиться и подумать, как код в Листингах 21-18, 21-19 и 21-20 отличался бы, если бы мы использовали future'ы вместо замыкания для выполняемой работы. Какие типы изменились бы? Как изменились бы сигнатуры методов, если вообще изменились? Какие части кода остались бы прежними?
После изучения цикла while let в Главе 17 и Главе 19 вам может быть интересно, почему мы не написали код потока Worker так, как показано в Листинге 21-21.
Файл: src/lib.rs
#![allow(unused)] fn main() { [Этот код не производит желаемого поведения.] // --пропуск-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
Листинг 21-21: Альтернативная реализация Worker::new с использованием while let
Этот код компилируется и запускается, но не приводит к желаемому поведению потоков: медленный запрос всё равно будет заставлять другие запросы ждать обработки. Причина несколько тонкая: структура Mutex не имеет публичного метода unlock, потому что владение блокировкой основано на времени жизни MutexGuard<T> внутри LockResult<MutexGuard<T>>, который возвращает метод lock. Во время компиляции заимствователь может затем обеспечить правило, что ресурс, защищённый Mutex, не может быть доступен, если мы не держим блокировку. Однако эта реализация также может привести к тому, что блокировка удерживается дольше, чем предполагалось, если мы не mindful о времени жизни MutexGuard<T>.
Код в Листинге 21-20, который использует let job = receiver.lock().unwrap().recv().unwrap();, работает, потому что с let любые временные значения, используемые в выражении справа от знака равенства, немедленно уничтожаются, когда оператор let заканчивается. Однако while let (и if let, и match) не уничтожает временные значения до конца связанного блока. В Листинге 21-21 блокировка остаётся удержанной на время вызова job(), что означает, что другие экземпляры Worker не могут получать задания.