Использование потоков для одновременного выполнения кода

В большинстве современных операционных систем код выполняемой программы запускается в процессе, и операционная система управляет несколькими процессами одновременно. Внутри программы также могут быть независимые части, которые выполняются одновременно. Функциональные возможности, которые запускают эти независимые части, называются потоками. Например, веб-сервер может иметь несколько потоков, чтобы иметь возможность обрабатывать более одного запроса одновременно.

Разделение вычислений в вашей программе на несколько потоков для одновременного выполнения нескольких задач может повысить производительность, но также добавляет сложности. Поскольку потоки могут выполняться одновременно, нет никаких inherent гарантий относительно порядка, в котором части вашего кода в разных потоках будут выполняться. Это может привести к проблемам, таким как:

  • Состояния гонки (Race conditions), при которых потоки обращаются к данным или ресурсам в непоследовательном порядке
  • Взаимные блокировки (Deadlocks), при которых два потока ждут друг друга, не позволяя обоим потокам продолжить выполнение
  • Ошибки, которые происходят только в определённых ситуациях и которые трудно воспроизвести и надёжно исправить

Rust пытается смягчить негативные эффекты от использования потоков, но программирование в многопоточном контексте всё равно требует тщательного обдумывания и структуры кода, которая отличается от программ, выполняющихся в одном потоке.

Языки программирования реализуют потоки несколькими разными способами, и многие операционные системы предоставляют API, который язык программирования может вызывать для создания новых потоков. Стандартная библиотека Rust использует модель реализации потоков 1:1, при которой программа использует один поток операционной системы на один языковой поток. Существуют крейты, которые реализуют другие модели многопоточности, делающие иные компромиссы по сравнению с моделью 1:1. (Асинхронная система Rust, которую мы увидим в следующей главе, также предоставляет другой подход к конкурентности.)

Создание нового потока с помощью spawn

Чтобы создать новый поток, мы вызываем функцию thread::spawn и передаём ей замыкание (мы говорили о замыканиях в Главе 13), содержащее код, который мы хотим выполнить в новом потоке. Пример в Листинге 16-1 выводит некоторый текст из главного потока и другой текст из нового потока.

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }
}

Обратите внимание, что когда главный поток программы Rust завершается, все порождённые потоки завершаются, независимо от того, закончили они выполнение или нет. Вывод этой программы может немного отличаться каждый раз, но он будет выглядеть примерно так:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызовы thread::sleep принудительно останавливают выполнение потока на короткое время, позволяя выполняться другому потоку. Потоки, вероятно, будут чередоваться, но это не гарантировано: это зависит от того, как ваша операционная система планирует потоки. В этом запуске главный поток напечатал первым, даже though оператор печати из порождённого потока appears первым в коде. И даже though мы сказали порождённому потоку печатать до тех пор, пока i не станет 9, он успел дойти только до 5, прежде чем главный поток завершился.

Если вы запустите этот код и увидите вывод только из главного потока или не увидите перекрытия, попробуйте увеличить числа в диапазонах, чтобы создать больше возможностей для операционной системы переключаться между потоками.

Ожидание завершения всех потоков

Код в Листинге 16-1 не только преждевременно останавливает порождённый поток в большинстве случаев из-за завершения главного потока, но и, поскольку нет гарантии порядка выполнения потоков, мы также не можем гарантировать, что порождённый поток вообще будет запущен!

Мы можем исправить проблему с тем, что порождённый поток не запускается или завершается преждевременно, сохранив возвращаемое значение thread::spawn в переменной. Возвращаемый тип thread::spawnJoinHandle<T>. JoinHandle<T> — это владеемое значение, которое при вызове метода join будет ждать завершения своего потока. Листинг 16-2 показывает, как использовать JoinHandle<T> потока, который мы создали в Листинге 16-1, и как вызвать join, чтобы убедиться, что порождённый поток завершится до выхода из main.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Вызов join для дескриптора блокирует текущий выполняющийся поток до тех пор, пока поток, представленный дескриптором, не завершится. Блокировка потока означает, что поток не может выполнять работу или завершаться. Поскольку мы поместили вызов join после цикла for главного потока, выполнение Листинга 16-2 должно давать вывод, похожий на этот:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжают чередоваться, но главный поток ждет из-за вызова handle.join() и не завершается, пока порождённый поток не закончит выполнение.

Но давайте посмотрим, что происходит, когда мы перемещаем handle.join() перед циклом for в main, вот так:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {i} from the main thread!");
        thread::sleep(Duration::from_millis(1));
    }
}

Главный поток будет ждать завершения порождённого потока, а затем выполнит свой цикл for, так что вывод больше не будет перемежаться, как показано здесь:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Мелкие детали, такие как место вызова join, могут влиять на то, выполняются ли ваши потоки одновременно.

Использование замыканий move с потоками

Мы часто будем использовать ключевое слово move с замыканиями, передаваемыми в thread::spawn, потому что замыкание тогда забирает во владение значения, которые оно использует из окружения, таким образом передавая владение этими значениями из одного потока в другой. В разделе «Захват ссылок или перемещение владения» в Главе 13 мы обсуждали move в контексте замыканий. Теперь мы сосредоточимся больше на взаимодействии между move и thread::spawn.

Обратите внимание в Листинге 16-1, что замыкание, которое мы передаём в thread::spawn, не принимает аргументов: мы не используем никакие данные из главного потока в коде порождённого потока. Чтобы использовать данные из главного потока в порождённом потоке, замыкание порождённого потока должно захватывать нужные ему значения. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в порождённом потоке. Однако это пока не будет работать, как вы увидите через мгновение.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {v:?}");
    });

    handle.join().unwrap();
}

Замыкание использует v, поэтому оно захватит v и сделает его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны иметь возможность получить доступ к v внутри этого нового потока. Но когда мы компилируем этот пример, мы получаем следующую ошибку:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {v:?}");
  |                                     - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {v:?}");
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

For more information about this error, try `rustc --explain E0373`.
error: could not compile `threads` (bin "threads") due to 1 previous error

Rust выводит (infers), как захватить v, и поскольку println! нужна только ссылка на v, замыкание пытается заимствовать v. Однако возникает проблема: Rust не может определить, как долго будет работать порождённый поток, поэтому он не знает, будет ли ссылка на v всегда действительной.

Листинг 16-4 предоставляет сценарий, в котором более вероятно наличие ссылки на v, которая будет недействительной.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {v:?}");
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

Если бы Rust позволил нам запустить этот код, существует вероятность, что порождённый поток будет немедленно помещён в фоновый режим без запуска вообще. Порождённый поток имеет ссылку на v внутри, но главный поток немедленно освобождает v, используя функцию drop, которую мы обсуждали в Главе 15. Затем, когда порождённый поток начинает выполняться, v больше не действителен, поэтому ссылка на него также недействительна. О нет!

Чтобы исправить ошибку компилятора в Листинге 16-3, мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Добавляя ключевое слово move перед замыканием, мы принудительно заставляем замыкание забирать во владение значения, которые оно использует, вместо того чтобы позволить Rust вывести, что оно должно заимствовать значения. Модификация Листинга 16-3, показанная в Листинге 16-5, будет компилироваться и работать так, как мы задумали.

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {v:?}");
    });

    handle.join().unwrap();
}

Нас может потяогнуть попробовать то же самое, чтобы исправить код в Листинге 16-4, где главный поток вызывал drop, используя замыкание move. Однако это исправление не сработает, потому то, что пытается сделать Листинг 16-4, запрещено по другой причине. Если бы мы добавили move к замыканию, мы бы переместили v в окружение замыкания и больше не смогли бы вызвать drop для него в главном потоке. Вместо этого мы получили бы эту ошибку компилятора:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
 4 |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`, which does not implement the `Copy` trait
 5 |
 6 |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
 7 |         println!("Here's a vector: {v:?}");
   |                                     - variable moved due to use in closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move
   |
help: consider cloning the value before moving it into the closure
   |
 6 ~     let value = v.clone();
 7 ~     let handle = thread::spawn(move || {
 8 ~         println!("Here's a vector: {value:?}");
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `threads` (bin "threads") due to 1 previous error

Правила владения Rust снова спасли нас! Мы получили ошибку из кода в Листинге 16-3, потому что Rust был консервативен и только заимствовал v для потока, что означало, что главный поток теоретически мог сделать ссылку порождённого потока недействительной. Приказав Rust переместить владение v в порождённый поток, мы гарантируем Rust, что главный поток больше не будет использовать v. Если мы изменим Листинг 16-4 таким же образом, мы тогда нарушим правила владения, когда попытаемся использовать v в главном потоке. Ключевое слово move переопределяет консервативный по умолчанию подход Rust к заимствованию; оно не позволяет нам нарушать правила владения.

Теперь, когда мы рассмотрели, что такое потоки и методы, предоставляемые API потоков, давайте посмотрим на некоторые ситуации, в которых мы можем использовать потоки.