Применение конкурентности с помощью Async

В этом разделе мы применим async к некоторым из тех же задач конкурентности, которые мы решали с помощью потоков в Главе 16. Поскольку мы уже обсуждали там многие ключевые идеи, в этом разделе мы сосредоточимся на том, что отличает потоки от фьючерсов.

Во многих случаях API для работы с конкурентностью с использованием async очень похожи на те, что используются с потоками. В других случаях они оказываются совершенно разными. Даже когда API выглядят похоже между потоками и async, они часто имеют разное поведение — и почти всегда имеют разные характеристики производительности.

Создание новой задачи с помощью spawn_task

Первой операцией, которую мы рассмотрели в разделе ["Создание нового потока с помощью spawn"] Главы 16, был счет на двух отдельных потоках. Давайте сделаем то же самое, используя async. Крейт trpl предоставляет функцию spawn_task, которая очень похожа на API thread::spawn, и функцию sleep, которая является асинхронной версией API thread::sleep. Мы можем использовать их вместе для реализации примера со счетом, как показано в Листинге 17-6.

Файл: src/main.rs

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Листинг 17-6: Создание новой задачи для печати одного сообщения, пока основная задача печатает другое

В качестве отправной точки мы настраиваем нашу функцию main с trpl::block_on, чтобы наша функция верхнего уровня могла быть асинхронной.

Примечание: С этого момента и до конца главы каждый пример будет включать один и тот же оборачивающий код с trpl::block_on в main, поэтому мы часто будем опускать его, как мы это делаем с main. Не забудьте включить его в свой код!

Затем мы пишем два цикла внутри этого блока, каждый из которых содержит вызов trpl::sleep, который ждет полсекунды (500 миллисекунд) перед отправкой следующего сообщения. Мы помещаем один цикл в тело trpl::spawn_task, а другой — в цикл for верхнего уровня. Мы также добавляем .await после вызовов sleep.

Этот код ведет себя аналогично реализации на основе потоков — включая тот факт, что в вашем терминале при запуске сообщения могут появляться в другом порядке:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Эта версия останавливается, как только цикл for в теле основного асинхронного блока завершается, потому что задача, порожденная spawn_task, завершается, когда заканчивается функция main. Если вы хотите, чтобы она выполнялась до полного завершения задачи, вам нужно использовать дескриптор присоединения (join handle), чтобы дождаться завершения первой задачи. С потоками мы использовали метод join, чтобы "заблокироваться" до завершения работы потока. В Листинге 17-7 мы можем использовать .await для того же самого, потому что сам дескриптор задачи является фьючерсом. Его ассоциированный тип Output — это Result, поэтому мы также извлекаем из него значение (unwrap) после ожидания.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
}

Листинг 17-7: Использование await с дескриптором присоединения для выполнения задачи до завершения

Эта обновленная версия работает до завершения обоих циклов:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Пока что кажется, что async и потоки дают нам схожие результаты, просто с разным синтаксисом: использование .await вместо вызова join у дескриптора присоединения и ожидание вызовов sleep.

Более важное различие заключается в том, что нам не потребовалось порождать еще один поток операционной системы для этого. На самом деле, нам даже не нужно здесь порождать задачу. Поскольку асинхронные блоки компилируются в анонимные фьючерсы, мы можем поместить каждый цикл в асинхронный блок и заставить рантайм выполнить их оба до завершения с помощью функции trpl::join.

В разделе ["Ожидание завершения всех потоков"] Главы 16 мы показали, как использовать метод join для типа JoinHandle, возвращаемого при вызове std::thread::spawn. Функция trpl::join похожа, но предназначена для фьючерсов. Когда вы передаете ей два фьючерса, она производит один новый фьючерс, выходным значением которого является кортеж, содержащий выходные значения каждого переданного фьючерса после того, как они оба завершатся. Таким образом, в Листинге 17-8 мы используем trpl::join, чтобы дождаться завершения и fut1, и fut2. Мы не ожидаем fut1 и fut2 напрямую, а вместо этого ожидаем новый фьючерс, созданный trpl::join. Мы игнорируем выходное значение, потому что это просто кортеж, содержащий два значения unit-типа ().

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
}

Листинг 17-8: Использование trpl::join для ожидания двух анонимных фьючерсов

При запуске мы видим, что оба фьючерса выполняются до завершения:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Теперь вы будете видеть один и тот же порядок каждый раз, что сильно отличается от того, что мы видели с потоками и с trpl::spawn_task в Листинге 17-7. Это потому, что функция trpl::join является честной (fair), что означает, что она проверяет каждый фьючерс одинаково часто, чередуя их, и никогда не позволяет одному уйти далеко вперед, если другой готов к выполнению. В случае с потоками операционная система решает, какой поток проверять и как долго позволять ему работать. В асинхронном Rust рантайм решает, какую задачу проверять. (На практике детали усложняются, потому что асинхронный рантайм может использовать потоки операционной системы внутри себя как часть управления конкурентностью, поэтому гарантировать честность может быть сложнее для рантайма — но это все равно возможно!) Рантаймы не обязаны гарантировать честность для любой конкретной операции, и они часто предлагают разные API, позволяющие вам выбирать, нужна вам честность или нет.

Попробуйте некоторые из этих вариантов ожидания фьючерсов и посмотрите, что они делают:

  • Уберите асинхронный блок из одного или обоих циклов.
  • Ожидайте каждый асинхронный блок сразу после его определения.
  • Оберните только первый цикл в асинхронный блок и ожидайте результирующий фьючерс после тела второго цикла.

Для дополнительного испытания попробуйте догадаться, каким будет вывод в каждом случае, прежде чем запускать код!

Передача данных между двумя задачами с помощью обмена сообщениями

Совместное использование данных между фьючерсами также будет знакомым: мы снова воспользуемся передачей сообщений, но на этот раз с асинхронными версиями типов и функций. Мы пойдем немного другим путем, чем в разделе ["Передача данных между потоками с помощью обмена сообщениями"] Главы 16, чтобы проиллюстрировать некоторые ключевые различия между конкурентностью на основе потоков и на основе фьючерсов. В Листинге 17-9 мы начнем с одного асинхронного блока — не порождая отдельную задачу, как мы порождали отдельный поток.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
}

Листинг 17-9: Создание асинхронного канала и назначение двух его половин tx и rx

Здесь мы используем trpl::channel — асинхронную версию API канала с несколькими производителями и одним потребителем (multiple-producer, single-consumer), который мы использовали с потоками в Главе 16. Асинхронная версия API лишь немного отличается от версии на основе потоков: она использует изменяемый (mut), а не неизменяемый приемник rx, и ее метод recv производит фьючерс, который нам нужно ожидать (await), а не производит значение напрямую. Теперь мы можем отправлять сообщения от отправителя к получателю. Обратите внимание, что нам не нужно порождать отдельный поток или даже задачу; нам просто нужно ожидать вызов rx.recv.

Синхронный метод Receiver::recv в std::mpsc::channel блокируется, пока не получит сообщение. Метод trpl::Receiver::recv так не делает, потому что он асинхронный. Вместо блокировки он возвращает управление рантайму до тех пор, пока либо не будет получено сообщение, либо отправляющая сторона канала не закроется. Напротив, мы не ожидаем вызов send, потому что он не блокирующий. Ему и не нужно блокироваться, потому что канал, в который мы отправляем, является неограниченным (unbounded).

Примечание: Поскольку весь этот асинхронный код выполняется в асинхронном блоке внутри вызова trpl::block_on, всё внутри него может избегать блокировки. Однако код снаружи будет блокироваться на возврате из функции block_on. В этом и состоит вся суть функции trpl::block_on: она позволяет вам выбрать, где блокироваться на некотором наборе асинхронного кода и, следовательно, где происходит переход между синхронным и асинхронным кодом.

Обратите внимание на две вещи в этом примере. Во-первых, сообщение придет сразу. Во-вторых, хотя мы используем здесь фьючерс, конкурентности пока нет. Все в листинге происходит последовательно, точно так же, как если бы фьючерсов не было вовсе.

Давайте решим первую часть, отправив серию сообщений и делая паузы между ними, как показано в Листинге 17-10.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
}

Листинг 17-10: Отправка и получение нескольких сообщений через асинхронный канал и ожидание с помощью await между каждым сообщением

В дополнение к отправке сообщений нам нужно их получать. В данном случае, поскольку мы знаем, сколько сообщений придет, мы могли бы сделать это вручную, вызвав rx.recv().await четыре раза. Однако в реальном мире мы обычно будем ожидать неизвестное количество сообщений, поэтому нам нужно продолжать ждать, пока мы не определим, что сообщений больше нет.

В Листинге 16-10 мы использовали цикл for для обработки всех элементов, полученных из синхронного канала. Однако в Rust пока нет возможности использовать цикл for с асинхронно производимой серией элементов, поэтому нам нужно использовать цикл, который мы раньше не видели: условный цикл while let. Это цикловая версия конструкции if let, которую мы видели в разделе ["Краткий поток управления с if let и let else"] Главы 6. Цикл будет продолжать выполняться до тех пор, пока указанный в нем шаблон продолжает соответствовать значению.

Вызов rx.recv производит фьючерс, который мы ожидаем (await). Рантайм приостановит выполнение этого фьючерса, пока он не будет готов. Как только сообщение arrives, фьючерс будет разрешаться в Some(message) столько раз, сколько сообщений придет. Когда канал закроется, независимо от того, поступали ли сообщения, фьючерс вместо этого разрешится в None, указывая, что больше значений нет и, следовательно, нам следует прекратить опрос (polling) — то есть прекратить ожидание (await).

Цикл while let объединяет всё это вместе. Если результат вызова rx.recv().await — это Some(message), мы получаем доступ к сообщению и можем использовать его в теле цикла, точно так же, как с if let. Если результат None, цикл завершается. Каждый раз, когда цикл завершается, он снова достигает точки ожидания (await point), поэтому рантайм снова приостанавливает его до прибытия следующего сообщения.

Теперь код успешно отправляет и получает все сообщения. К сожалению, осталась пара проблем. Во-первых, сообщения не приходят с интервалом в полсекунды. Они приходят все сразу, через 2 секунды (2000 миллисекунд) после запуска программы. Во-вторых, эта программа также никогда не завершается! Вместо этого она ждет новые сообщения вечно. Вам нужно будет завершить ее с помощью Ctrl-C.

Код внутри одного асинхронного блока выполняется линейно

Давайте начнем с того, чтобы выяснить, почему сообщения приходят все сразу после полной задержки, а не с задержками между каждым. Внутри данного асинхронного блока порядок, в котором ключевые слова .await появляются в коде, также является порядком их выполнения при запуске программы.

В Листинге 17-10 есть только один асинхронный блок, поэтому всё в нем выполняется линейно. Конкурентности по-прежнему нет. Все вызовы tx.send происходят, перемежаясь со всеми вызовами trpl::sleep и их соответствующими точками ожидания (await points). Только после этого цикл while let получает возможность пройти через какие-либо точки ожидания для вызовов recv.

Чтобы добиться желаемого поведения, при котором задержка sleep происходит между каждым сообщением, нам нужно поместить операции tx и rx в их собственные асинхронные блоки, как показано в Листинге 17-11. Тогда рантайм сможет выполнять каждый из них отдельно с помощью trpl::join, как в Листинге 17-8. Еще раз: мы ожидаем результат вызова trpl::join, а не отдельные фьючерсы. Если бы мы ожидали отдельные фьючерсы последовательно, мы бы просто вернулись к последовательному потоку — именно то, чего мы пытаемся избежать.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
}

Листинг 17-11: Разделение отправки и получения в собственные асинхронные блоки и ожидание фьючерсов для этих блоков

С обновленным кодом из Листинга 17-11 сообщения печатаются с интервалом в 500 миллисекунд, а не все сразу через 2 секунды.

Перемещение владения в асинхронный блок

Однако программа по-прежнему никогда не завершается из-за того, как цикл while let взаимодействует с trpl::join:

  1. Фьючерс, возвращенный из trpl::join, завершается только тогда, когда оба переданных ему фьючерса завершены.
  2. Фьючерс tx_fut завершается, как только он заканчивает ожидание после отправки последнего сообщения в vals.
  3. Фьючерс rx_fut не завершится, пока не закончится цикл while let.
  4. Цикл while let не закончится, пока ожидание rx.recv не даст None.
  5. Ожидание rx.recv вернет None только тогда, когда другой конец канала закроется.
  6. Канал закроется только если мы вызовем rx.close или когда отправляющая сторона, tx, будет удалена (dropped).
  7. Мы нигде не вызываем rx.close, а tx не будет удалена до тех пор, пока не завершится самый внешний асинхронный блок, переданный в trpl::block_on.
  8. Блок не может завершиться, потому что он заблокирован на завершении trpl::join, что возвращает нас к началу этого списка.

Сейчас асинхронный блок, в котором мы отправляем сообщения, только заимствует (borrows) tx, потому что отправка сообщения не требует владения. Но если бы мы могли переместить (move) tx в этот асинхронный блок, он был бы удален, как только этот блок завершится. В разделе ["Захват ссылок или перемещение владения"] Главы 13 вы узнали, как использовать ключевое слово move с замыканиями, и, как обсуждалось в разделе ["Использование замыканий move с потоками"] Главы 16, нам часто нужно перемещать данные в замыкания при работе с потоками. Та же базовая динамика применима к асинхронным блокам, поэтому ключевое слово move работает с асинхронными блоками так же, как и с замыканиями.

В Листинге 17-12 мы изменяем блок, используемый для отправки сообщений, с async на async move.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
}

Листинг 17-12: Пересмотренная версия кода из Листинга 17-11, которая корректно завершается после выполнения

Когда мы запускаем эту версию кода, она корректно завершается после отправки и получения последнего сообщения. Далее давайте посмотрим, что нужно изменить, чтобы отправлять данные из более чем одного фьючерса.

Объединение нескольких фьючерсов с помощью макроса join!

Этот асинхронный канал также является каналом с несколькими производителями, поэтому мы можем вызвать clone на tx, если хотим отправлять сообщения из нескольких фьючерсов, как показано в Листинге 17-13.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
}

Листинг 17-13: Использование нескольких производителей с асинхронными блоками

Сначала мы клонируем tx, создавая tx1 вне первого асинхронного блока. Мы перемещаем tx1 в этот блок так же, как делали это ранее с tx. Затем, позже, мы перемещаем оригинальный tx в новый асинхронный блок, где отправляем больше сообщений с немного более медленной задержкой. Мы размещаем этот новый асинхронный блок после блока для приема сообщений, но он с тем же успехом мог бы быть и до него. Ключевым моментом является порядок, в котором ожидаются фьючерсы, а не порядок их создания.

Оба асинхронных блока для отправки сообщений должны быть блоками async move, чтобы и tx, и tx1 были удалены (dropped) при завершении этих блоков. В противном случае мы снова окажемся в том же бесконечном цикле, с которого начинали.

Наконец, мы переходим от trpl::join к trpl::join!, чтобы обработать дополнительный фьючерс: макрос join! ожидает произвольное количество фьючерсов, когда мы знаем их количество на этапе компиляции. Мы обсудим ожидание коллекции из неизвестного числа фьючерсов позже в этой главе.

Теперь мы видим все сообщения от обоих отправляющих фьючерсов, и, поскольку отправляющие фьючерсы используют немного разные задержки после отправки, сообщения также принимаются с этими разными интервалами:

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Мы исследовали, как использовать передачу сообщений для отправки данных между фьючерсами, как код внутри асинхронного блока выполняется последовательно, как переместить владение в асинхронный блок и как объединить несколько фьючерсов. Далее давайте обсудим, как и зачем сообщать рантайму, что он может переключиться на другую задачу.