Конкурентность с разделяемым состоянием
Передача сообщений — это хороший способ обработки конкурентности, но не единственный. Другой метод заключается в том, чтобы несколько потоков обращались к одним и тем же разделяемым данным. Рассмотрим ещё раз эту часть слогана из документации языка Go: «Не общайтесь, разделяя память».
Как бы выглядело общение путём разделения памяти? Кроме того, почему энтузиасты передачи сообщений предостерегают от использования разделения памяти?
В некотором смысле каналы в любом языке программирования похожи на единоличное владение, потому что once вы передаёте значение по каналу, вы больше не должны использовать это значение. Конкурентность с разделяемой памятью похожа на множественное владение: несколько потоков могут обращаться к одной и той же области памяти одновременно. Как вы видели в Главе 15, где умные указатели сделали множественное владение возможным, множественное владение может добавлять сложности, потому что этими разными владельцами нужно управлять. Система типов и правила владения Rust greatly помогают в правильной организации этого управления. Для примера давайте рассмотрим мьютексы — один из наиболее распространённых примитивов конкурентности для разделяемой памяти.
Управление доступом с помощью мьютексов
Мьютекс — это сокращение от mutual exclusion (взаимное исключение), так как мьютекс позволяет only одному потоку получать доступ к некоторым данным в любой given момент времени. Чтобы получить доступ к данным в мьютексе, поток должен сначала сигнализировать, что он хочет доступ, запросив захват блокировки мьютекса. Блокировка — это структура данных, которая является частью мьютекса и отслеживает, кто в настоящее время имеет эксклюзивный доступ к данным. Поэтому мьютекс описывается как защищающий данные, которые он содержит, через систему блокировок.
Мьютексы имеют репутацию сложных в использовании, потому что вы должны помнить два правила:
- Вы должны попытаться захватить блокировку before использования данных.
- Когда вы закончили работать с данными, которые защищает мьютекс, вы должны разблокировать данные, чтобы другие потоки могли захватить блокировку.
Для реальной метафоры мьютекса представьте панельную дискуссию на конференции only с одним микрофоном. Прежде чем участник панели сможет говорить, он должен попросить или сигнализировать, что хочет использовать микрофон. Когда он получает микрофон, он может говорить столько, сколько захочет, а затем передать микрофон следующему участнику панели, который попросит слово. Если участник панели забудет передать микрофон, когда закончит, никто другой не сможет говорить. Если управление общим микрофоном пойдёт не так, панель не будет работать как planned!
Управление мьютексами может быть невероятно сложным для правильной реализации, поэтому многие люди enthusiastic о каналах. Однако благодаря системе типов и правилам владения Rust вы не можете ошибиться с блокировкой и разблокировкой.
API Mutex<T>
В качестве примера использования мьютекса давайте начнём с использования мьютекса в однопоточном контексте, как показано в Листинге 16-12.
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println!("m = {m:?}"); }
Как и для многих типов, мы создаём Mutex<T> с помощью ассоциированной функции new. Чтобы получить доступ к данным внутри мьютекса, мы используем метод lock для получения блокировки. Этот вызов будет блокировать текущий поток, чтобы он не мог выполнять any работу, пока не наступит его очередь получить блокировку.
Вызов lock завершится ошибкой, если другой поток, удерживающий блокировку, запаникует. В этом случае никто никогда не сможет получить блокировку, поэтому мы выбрали unwrap и заставляем этот поток паниковать, если мы окажемся в такой ситуации.
После того как мы получили блокировку, мы можем обращаться с возвращаемым значением, названным num в этом случае, как с изменяемой ссылкой на внутренние данные. Система типов гарантирует, что мы получим блокировку before использования значения в m. Тип m — Mutex<i32>, а не i32, поэтому мы должны вызвать lock, чтобы иметь возможность использовать значение i32. Мы не можем забыть; система типов не позволит нам получить доступ к внутреннему i32 иначе.
Вызов lock возвращает тип с именем MutexGuard, обёрнутый в LockResult, который мы обработали вызовом unwrap. Тип MutexGuard реализует Deref для указания на наши внутренние данные; этот тип также имеет реализацию Drop, которая автоматически освобождает блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости. В результате мы не рискуем забыть освободить блокировку и заблокировать мьютекс для использования другими потоками, потому что освобождение блокировки происходит automatically.
После сброса блокировки мы можем вывести значение мьютекса и увидеть, что нам удалось изменить внутреннее i32 на 6.
Разделяемый доступ к Mutex<T>
Теперь давайте попробуем разделить значение между несколькими потоками, используя Mutex<T>. Мы запустим 10 потоков и заставим каждый из них увеличить значение счётчика на 1, чтобы счётчик изменился с 0 на 10. Пример в Листинге 16-13 будет содержать ошибку компиляции, и мы используем эту ошибку, чтобы узнать больше об использовании Mutex<T> и о том, как Rust помогает нам использовать его correctly.
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Мы создаём переменную counter, чтобы хранить i32 внутри Mutex<T>, как мы делали в Листинге 16-12. Далее мы создаём 10 потоков, перебирая диапазон чисел. Мы используем thread::spawn и даём всем потокам одно и то же замыкание: такое, которое перемещает counter в поток, получает блокировку на Mutex<T>, вызывая метод lock, а затем добавляет 1 к значению в мьютексе. Когда поток завершает выполнение своего замыкания, num выходит из области видимости и освобождает блокировку, чтобы другой поток мог её получить.
В главном потоке мы собираем все дескрипторы присоединения (join handles). Затем, как мы делали в Листинге 16-2, мы вызываем join для каждого дескриптора, чтобы убедиться, что все потоки завершатся. В этот момент главный поток получит блокировку и выведет результат работы программы.
Мы намекнули, что этот пример не скомпилируется. Теперь давайте выясним, почему!
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
--> src/main.rs:21:29
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
8 | for _ in 0..10 {
| -------------- inside of this loop
9 | let handle = thread::spawn(move || {
| ------- value moved into closure here, in previous iteration of loop
...
21 | println!("Result: {}", *counter.lock().unwrap());
| ^^^^^^^ value borrowed here after move
|
help: consider moving the expression out of the loop so it is only moved once
|
8 ~ let mut value = counter.lock();
9 ~ for _ in 0..10 {
10 | let handle = thread::spawn(move || {
11 ~ let mut num = value.unwrap();
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
Сообщение об ошибке гласит, что значение counter было перемещено на предыдущей итерации цикла. Rust говорит нам, что мы не можем переместить владение блокировкой counter в несколько потоков. Давайте исправим ошибку компиляции с помощью метода множественного владения, который мы обсуждали в Главе 15.
Множественное владение с несколькими потоками
В Главе 15 мы передавали значение нескольким владельцам, используя умный указатель Rc<T> для создания значения с подсчётом ссылок. Давайте сделаем то же самое здесь и посмотрим, что произойдёт. Мы обернём Mutex<T> в Rc<T> в Листинге 16-14 и клонируем Rc<T> before перемещения владения в поток.
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Снова компилируем и получаем... другие ошибки! Компилятор учит нас многому:
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
| | |
| | required by a bound introduced by this call
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________^ `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<std::sync::Mutex<i32>>`
note: required because it's used within this closure
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:723:1
For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
Вау, это сообщение об ошибке очень многословно! Вот важная часть, на которой нужно сосредоточиться: `Rc<Mutex<i32>>` cannot be sent between threads safely. Компилятор также сообщает нам причину: the trait `Send` is not implemented for `Rc<Mutex<i32>>`. Мы поговорим о Send в следующем разделе: это один из трейтов, который гарантирует, что типы, которые мы используем с потоками, предназначены для использования в конкурентных ситуациях.
К сожалению, Rc<T> небезопасно разделять между потоками. Когда Rc<T> управляет счётчиком ссылок, он добавляет к счётчику при каждом вызове clone и вычитает из счётчика, когда каждый клон удаляется. Но он не использует any примитивы конкурентности, чтобы гарантировать, что изменения счётчика не могут быть прерваны другим потоком. Это может привести к неверным счетчикам — subtle ошибкам, которые, в свою очередь, могут привести к утечкам памяти или удалению значения before того, как мы закончим с ним работать. Что нам нужно, так это тип, exactly похожий на Rc<T>, но который вносит изменения в счётчик ссылок потокобезопасным способом.
Атомарный подсчёт ссылок с Arc<T>
К счастью, Arc<T> является типом, подобным Rc<T>, который безопасно использовать в конкурентных ситуациях. Буква a означает atomic (атомарный), то есть это атомарно подсчитываемый тип ссылок. Атомарные операции — это дополнительный вид примитивов конкурентности, которые мы не будем здесь подробно рассматривать: см. документацию стандартной библиотеки для std::sync::atomic для получения более detailed информации. На данном этапе вам просто нужно знать, что атомарные операции работают как примитивные типы, но безопасны для разделения между потоками.
Вам может быть интересно, почему все примитивные типы не являются атомарными и почему типы стандартной библиотеки не реализованы для использования Arc<T> по умолчанию. Причина в том, что потокобезопасность comes с штрафом производительности, который вы хотите платить only тогда, когда вам это really нужно. Если вы просто выполняете операции со значениями в within одного потока, ваш код может работать faster, если ему не нужно обеспечивать гарантии, предоставляемые атомарными операциями.
Вернёмся к нашему примеру: Arc<T> и Rc<T> имеют одинаковый API, поэтому мы исправляем нашу программу, изменяя строку use, вызов new и вызов clone. Код в Листинге 16-15 finally скомпилируется и запустится.
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
Этот код выведет следующее:
Result: 10
Мы сделали это! Мы посчитали от 0 до 10, что может показаться не очень впечатляющим, но это научило нас многому о Mutex<T> и потокобезопасности. Вы также можете использовать структуру этой программы для выполнения более сложных операций, чем просто увеличение счётчика. Используя эту стратегию, вы можете разделить вычисление на независимые части, распределить эти части по потокам, а затем использовать Mutex<T>, чтобы каждый поток обновлял конечный результат своей частью.
Обратите внимание, что если вы выполняете простые числовые операции, существуют типы проще, чем Mutex<T>, предоставляемые модулем std::sync::atomic стандартной библиотеки. Эти типы обеспечивают безопасный, конкурентный, атомарный доступ к примитивным типам. Мы выбрали использование Mutex<T> с примитивным типом для этого примера, чтобы мы могли сосредоточиться на том, как работает Mutex<T>.
Сравнение RefCell<T>/Rc<T> и Mutex<T>/Arc<T>
Вы, возможно, заметили, что counter является неизменяемым, но мы могли получить изменяемую ссылку на значение внутри него; это означает, что Mutex<T> обеспечивает внутреннюю изменяемость (interior mutability), как и семейство Cell. Таким же образом, как мы использовали RefCell<T> в Главе 15, чтобы позволить нам изменять содержимое внутри Rc<T>, мы используем Mutex<T> для изменения содержимого внутри Arc<T>.
Ещё одна деталь, которую следует отметить, заключается в том, что Rust не может защитить вас от всех видов логических ошибок, когда вы используете Mutex<T>. Вспомните из Главы 15, что использование Rc<T> связано с риском создания циклов ссылок, где два значения Rc<T> ссылаются друг на друга, вызывая утечки памяти. Аналогично, Mutex<T> связан с риском создания взаимных блокировок (deadlocks). Они возникают, когда операции необходимо заблокировать два ресурса, и два потока захватили по одной блокировке, заставляя их ждать друг друга вечно. Если вам интересны взаимные блокировки, попробуйте создать программу на Rust, в которой есть взаимная блокировка; затем изучите стратегии смягчения взаимных блокировок для мьютексов на любом языке и попробуйте реализовать их в Rust. Документация API стандартной библиотеки для Mutex<T> и MutexGuard предлагает useful информацию.
Мы завершим эту главу, рассказав о трейтах Send и Sync и о том, как мы можем использовать их с пользовательскими типами.