Передача данных между потоками с помощью обмена сообщениями

Одним из всё более популярных подходов к обеспечению безопасной конкурентности является передача сообщений, где потоки или акторы общаются, отправляя друг другу сообщения, содержащие данные. Вот идея, выраженная в слогане из документации языка Go: «Не общайтесь, разделяя память; вместо этого разделяйте память, общаясь».

Для реализации конкурентности с отправкой сообщений стандартная библиотека Rust предоставляет реализацию каналов. Канал — это общая концепция программирования, при которой данные отправляются из одного потока в другой.

Вы можете представить канал в программировании как направленный водный канал, такой как ручей или река. Если вы поместите что-то вроде резиновой уточки в реку, она поплывёт вниз по течению до конца водного пути.

Канал состоит из двух половин: передатчика и приёмника. Половина передатчика — это место upstream, где вы бросаете резиновую уточку в реку, а половина приёмника — это место, где резиновая уточка оказывается downstream. Одна часть вашего кода вызывает методы передатчика с данными, которые вы хотите отправить, а другая часть проверяет принимающий конец на наличие поступающих сообщений. Говорят, что канал закрыт, если либо передатчик, либо приёмник уничтожены.

Здесь мы будем работать над программой, в которой один поток генерирует значения и отправляет их по каналу, а другой поток получает значения и выводит их. Мы будем отправлять простые значения между потоками, используя канал, чтобы проиллюстрировать функциональность. Как только вы ознакомитесь с техникой, вы сможете использовать каналы для любых потоков, которым нужно общаться друг с другом, например, для чат-системы или системы, в которой многие потоки выполняют части вычисления и отправляют части в один поток, который агрегирует результаты.

Сначала, в Листинге 16-6, мы создадим канал, но ничего не будем с ним делать. Обратите внимание, что это пока не скомпилируется, потому что Rust не может определить, значения какого типа мы хотим отправлять по каналу.

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Мы создаём новый канал с помощью функции mpsc::channel; mpsc означает multiple producer, single consumer (несколько производителей, один потребитель). Короче говоря, то, как стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих концов, которые производят значения, но только один принимающий конец, который потребляет эти значения. Представьте несколько ручьев, сливающихся в одну большую реку: всё, что отправлено по любому из ручьев, окажется в одной реке в конце. Мы начнём с одного производителя, но добавим несколько производителей, когда этот пример заработает.

Функция mpsc::channel возвращает кортеж, первый элемент которого — отправляющий конец (передатчик), а второй элемент — принимающий конец (приёмник). Аббревиатуры tx и rx традиционно используются во многих областях для transmitter (передатчик) и receiver (приёмник) соответственно, поэтому мы называем наши переменные так, чтобы обозначить каждый конец. Мы используем оператор let с шаблоном, который деструктурирует кортежи; мы обсудим использование шаблонов в операторах let и деструктуризацию в Главе 19. Пока знайте, что использование оператора let таким образом — это удобный подход для извлечения частей кортежа, возвращаемого mpsc::channel.

Давайте переместим передающий конец в порождённый поток и заставим его отправить одну строку, чтобы порождённый поток общался с главным потоком, как показано в Листинге 16-7. Это похоже на помещение резиновой уточки в реку upstream или отправку сообщения чата из одного потока в другой.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Снова мы используем thread::spawn для создания нового потока, а затем используем move для перемещения tx в замыкание, чтобы порождённый поток владел tx. Порождённому потоку необходимо владеть передатчиком, чтобы иметь возможность отправлять сообщения через канал.

Передатчик имеет метод send, который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result<T, E>, поэтому, если приёмник уже уничтожен и некуда отправлять значение, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap, чтобы паниковать в случае ошибки. Но в реальном приложении мы бы обработали это properly: вернитесь к Главе 9, чтобы повторить стратегии правильной обработки ошибок.

В Листинге 16-8 мы получим значение от приёмника в главном потоке. Это похоже на извлечение резиновой уточки из воды в конце реки или получение сообщения чата.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

У приёмника есть два полезных метода: recv и try_recv. Мы используем recv, сокращение от receive (получить), который будет блокировать выполнение главного потока и ждать, пока значение не будет отправлено по каналу. Как только значение отправлено, recv вернёт его в Result<T, E>. Когда передатчик закроется, recv вернёт ошибку, сигнализируя, что больше значений не будет.

Метод try_recv не блокирует, а вместо этого немедленно возвращает Result<T, E>: значение Ok, содержащее сообщение, если оно доступно, и значение Err, если в этот раз сообщений нет. Использование try_recv полезно, если у этого потока есть другая работа, пока он ждёт сообщений: мы могли бы написать цикл, который время от времени вызывает try_recv, обрабатывает сообщение, если оно доступно, а в противном случае выполняет другую работу в течение короткого времени, пока снова не проверит.

Мы использовали recv в этом примере для простоты; у нас нет другой работы для главного потока, кроме как ждать сообщения, поэтому блокировка главного потока уместна.

Когда мы запустим код из Листинга 16-8, мы увидим значение, выведенное из главного потока:

Got: hi

Отлично!

Передача владения через каналы

Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный конкурентный код. Предотвращение ошибок в конкурентном программировании — это преимущество мышления о владении во всех ваших программах на Rust. Давайте проведём эксперимент, чтобы показать, как каналы и владение работают вместе, чтобы предотвратить проблемы: мы попытаемся использовать значение val в порождённом потоке после того, как отправили его по каналу. Попробуйте скомпилировать код в Листинге 16-9, чтобы понять, почему этот код не разрешён.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Здесь мы пытаемся напечатать val после того, как отправили его по каналу через tx.send. Разрешить это было бы плохой идеей: как только значение отправлено в другой поток, этот поток может изменить или удалить его до того, как мы попытаемся использовать значение снова. Потенциально модификации другого потока могут вызвать ошибки или неожиданные результаты из-за inconsistent или несуществующих данных. Однако Rust выдаёт нам ошибку, если мы пытаемся скомпилировать код из Листинга 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Наша ошибка конкурентности вызвала ошибку времени компиляции. Функция send забирает владение своим параметром, и когда значение перемещается, приёмник забирает владение им. Это мешает нам случайно использовать значение again после его отправки; система владения проверяет, что всё в порядке.

Отправка нескольких значений

Код в Листинге 16-8 скомпилировался и запустился, но он не показал нам чётко, что два отдельных потока общаются друг с другом через канал.

В Листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в Листинге 16-8 выполняется конкурентно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу на секунду между каждым сообщением.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить в главный поток. Мы перебираем их, отправляя каждую individually, и делаем паузу между каждой, вызывая функцию thread::sleep со значением Duration в одну секунду.

В главном потоке мы больше не вызываем функцию recv явно: вместо этого мы обращаемся с rx как с итератором. Для каждого полученного значения мы выводим его. Когда канал закрывается, итерация заканчивается.

При запуске кода из Листинга 16-10 вы должны увидеть следующий вывод с паузой в одну секунду между каждой строкой:

Got: hi
Got: from
Got: the
Got: thread

Поскольку у нас нет никакого кода, который приостанавливает или задерживает выполнение в цикле for в главном потоке, мы можем сказать, что главный поток ждет получения значений от порождённого потока.

Создание нескольких производителей

Ранее мы упоминали, что mpsc была аббревиатурой для multiple producer, single consumer (несколько производителей, один потребитель). Давайте используем mpsc и расширим код в Листинге 16-10, чтобы создать несколько потоков, которые все отправляют значения одному и тому же приёмнику. Мы можем сделать это, клонировав передатчик, как показано в Листинге 16-11.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

На этот раз, прежде чем создать первый порождённый поток, мы вызываем clone для передатчика. Это даст нам новый передатчик, который мы можем передать первому порождённому потоку. Мы передаём оригинальный передатчик второму порождённому потоку. Это даёт нам два потока, каждый из которых отправляет разные сообщения одному приёмнику.

Когда вы запустите код, ваш вывод должен выглядеть примерно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Вы можете увидеть значения в другом порядке, в зависимости от вашей системы. Это то, что делает конкурентность интересной, а также сложной. Если вы поэкспериментируете с thread::sleep, давая ему различные значения в разных потоках, каждый запуск будет более недетерминированным и будет создавать different вывод каждый раз.

Теперь, когда мы рассмотрели, как работают каналы, давайте посмотрим на другой метод конкурентности.