Async и Await

В этой главе мы начнем заниматься асинхронным программированием в Rust и познакомимся с ключевыми словами async и await.

async — это аннотация для функций (и других элементов, таких как трейты, к которым мы вернемся позже); await — это оператор, используемый в выражениях. Но прежде чем мы перейдем к этим ключевым словам, нам нужно охватить несколько основных концепций асинхронного программирования в Rust, что следует из обсуждения в предыдущей главе, здесь мы свяжем вещи напрямую с программированием на Rust.

Концепции async в Rust

Среда выполнения (Runtime)

Асинхронными задачами необходимо управлять и планировать их. Обычно задач больше, чем доступных ядер, поэтому их нельзя запустить все сразу. Когда одна задача останавливает выполнение, должна быть выбрана другая для выполнения. Если задача ожидает IO или какое-либо другое событие, ее не следует планировать, но когда это событие завершится, ее следует запланировать. Это требует взаимодействия с ОС и управления работой IO.

Многие языки программирования предоставляют среду выполнения (runtime). Обычно эта среда выполнения делает гораздо больше, чем просто управляет асинхронными задачами — она может управлять памятью (включая сборку мусора), участвовать в обработке исключений, предоставлять уровень абстракции над ОС или даже быть полноценной виртуальной машиной. Rust — это низкоуровневый язык, который стремится к минимальным накладным расходам времени выполнения. Поэтому асинхронная среда выполнения имеет гораздо более ограниченную область действия, чем среды выполнения многих других языков. Также существует множество способов проектирования и реализации асинхронной среды выполнения, поэтому Rust позволяет вам выбрать одну в зависимости от ваших требований, а не предоставляет одну. Это означает, что для начала работы с асинхронным программированием требуется дополнительный шаг.

Помимо запуска и планирования задач, среда выполнения должна взаимодействовать с ОС для управления асинхронным IO. Она также должна предоставлять функциональность таймеров для задач (что пересекается с управлением IO). Не существует строгих правил о том, как должна быть структурирована среда выполнения, но некоторые термины и разделение обязанностей являются общими:

  • Реактор (reactor) или цикл событий (event loop) или драйвер (driver) (эквивалентные термины): распределяет события IO и таймеров, взаимодействует с ОС и выполняет низкоуровневое продвижение выполнения вперед.
  • Планировщик (scheduler): определяет, когда задачи могут выполняться и на каких потоках ОС.
  • Исполнитель (executor) или среда выполнения (runtime): объединяет реактор и планировщик и представляет собой пользовательский API для запуска асинхронных задач; среда выполнения (runtime) также используется для обозначения всей библиотеки функциональности (например, всего в крейте Tokio, а не только исполнителя Tokio, который представлен типом Runtime).

Помимо исполнителя, как описано выше, крейт среды выполнения обычно включает множество вспомогательных трейтов и функций. Они могут включать трейты (например, AsyncRead) и реализации для IO, функциональность для распространенных задач IO, таких как сетевое взаимодействие или доступ к файловой системе, блокировки, каналы и другие примитивы синхронизации, утилиты для работы со временем, утилиты для работы с ОС (например, обработка сигналов), вспомогательные функции для работы с фьючерсами и потоками (асинхронные итераторы) или инструменты мониторинга и наблюдения. Мы рассмотрим многие из них в этом руководстве.

Существует множество асинхронных сред выполнения на выбор. Некоторые имеют очень разные политики планирования или оптимизированы для конкретной задачи или области. Для большей части этого руководства мы будем использовать среду выполнения Tokio. Это универсальная среда выполнения, и она является самой популярной в экосистеме. Это отличный выбор для начала работы и для production-использования. В некоторых обстоятельствах вы можете получить лучшую производительность или иметь возможность написать более простой код с другой средой выполнения. Позже в этом руководстве мы обсудим некоторые другие доступные среды выполнения и почему вы можете выбрать ту или иную, или даже написать свою собственную.

Чтобы начать работать как можно быстрее, вам нужно совсем немного шаблонного кода. Вам нужно включить крейт Tokio как зависимость в ваш Cargo.toml (как и любой другой крейт):

[dependencies]
tokio = { version = "1", features = ["full"] }

И вы будете использовать аннотацию tokio::main для вашей функции main, чтобы она могла быть асинхронной функцией (что в противном случае не разрешено в Rust):

#[tokio::main]
async fn main() { ... }

Вот и все! Вы готовы писать асинхронный код!

Аннотация #[tokio::main] инициализирует среду выполнения Tokio и запускает асинхронную задачу для выполнения кода в main. Позже в этом руководстве мы подробнее объясним, что делает эта аннотация и как использовать асинхронный код без нее (что даст вам больше гибкости).

Futures-rs и экосистема

TODO контекст и история, для чего нужен futures-rs — раньше использовался часто, сейчас, вероятно, не нужен, пересечение с Tokio и другими средами выполнения (иногда с тонкими семантическими различиями), почему он может вам понадобиться (работа с фьючерсами напрямую, особенно написание своих, потоки (streams), некоторые утилиты)

Другие вещи экосистемы — крейты Yosh, альтернативные среды выполнения, экспериментальные вещи, другие?

Фьючерсы и задачи

Базовой единицей асинхронной конкурентности в Rust является фьючерс (future). Фьючерс — это просто обычный старый объект Rust (обычно структура или перечисление), который реализует трейт 'Future'. Фьючерс представляет отложенное вычисление. То есть вычисление, которое будет готово в какой-то момент в будущем.

Мы много будем говорить о фьючерсах в этом руководстве, но проще начать, не слишком беспокоясь о них. Мы будем упоминать их довольно часто в следующих нескольких разделах, но мы не будем на самом деле определять их или использовать напрямую до конца. Один важный аспект фьючерсов заключается в том, что их можно комбинировать, чтобы создавать новые, «большие» фьючерсы (мы поговорим подробнее о том, как их можно комбинировать позже).

Я довольно неформально использовал термин «асинхронная задача» в предыдущей главе и этой. Я использовал этот термин для обозначения логической последовательности выполнения; аналогично потоку, но управляемому внутри программы, а не извне ОС. Часто полезно думать в терминах задач, однако в самом Rust нет концепции задачи, и этот термин используется для обозначения разных вещей! Это сбивает с толку! Что еще хуже, среды выполнения имеют концепцию задачи, и разные среды выполнения имеют немного разные концепции задач.

Отныне я буду стараться быть точным в терминологии, касающейся задач. Когда я использую просто «задача» (task), я имею в виду абстрактную концепцию последовательности вычислений, которая может происходить конкурентно с другими задачами. Я буду использовать «асинхронная задача» (async task) для обозначения точно того же, но в противопоставление задаче, реализованной как поток ОС. Я буду использовать «задача среды выполнения» (runtime's task) для обозначения любого вида задачи, которую представляет себе среда выполнения, и «задача tokio» (tokio task) (или какой-либо другой конкретной среды выполнения) для обозначения концепции задачи в Tokio.

Асинхронная задача в Rust — это просто фьючерс (обычно «большой» фьючерс, составленный из многих других). Другими словами, задача — это фьючерс, который выполняется. Однако бывают случаи, когда фьючерс «выполняется», не будучи задачей среды выполнения. Такой фьючерс интуитивно является задачей, но не задачей среды выполнения. Я уточню это, когда мы дойдем до примера.

Асинхронные функции

Ключевое слово async является модификатором для объявлений функций. Например, мы можем написать pub async fn send_to_server(...). Асинхронная функция — это просто функция, объявленная с использованием ключевого слова async, и это означает, что это функция, которая может выполняться асинхронно, другими словами, вызывающая сторона может выбрать не ждать завершения функции перед выполнением чего-то другого.

Более механически, когда асинхронная функция вызывается, тело не выполняется, как это было бы для обычной функции. Вместо этого тело функции и ее аргументы упаковываются в фьючерс, который возвращается вместо реального результата. Вызывающая сторона может затем решить, что делать с этим фьючерсом (если вызывающей стороне нужен результат «немедленно», то она будет awaitить фьючерс, см. следующий раздел).

Внутри асинхронной функции код выполняется обычным, последовательным образом1, асинхронность не меняет этого. Вы можете вызывать синхронные функции из асинхронных функций, и выполнение продолжается как обычно. Одна дополнительная вещь, которую вы можете делать внутри асинхронной функции, — использовать await для ожидания других асинхронных функций (или фьючерсов), что может привести к уступке управления, чтобы другая задача могла выполниться.

await

Мы заявили выше, что фьючерс — это вычисление, которое будет готово в какой-то момент в будущем. Чтобы получить результат этого вычисления, мы используем ключевое слово await. Если результат готов немедленно или может быть вычислен без ожидания, то await просто выполняет это вычисление для получения результата. Однако, если результат не готов, то await передает управление планировщику, чтобы другая задача могла продолжить (это кооперативная многозадачность, упомянутая в предыдущей главе).

Синтаксис использования await — some_future.await, т.е. это постфиксное ключевое слово, используемое с оператором .. Это означает, что его можно эргономично использовать в цепочках вызовов методов и обращений к полям.

Рассмотрим следующие функции:

#![allow(unused)]
fn main() {
// Асинхронная функция, но ей не нужно ничего ждать.
async fn add(a: u32, b: u32) -> u32 {
  a + b
}

async fn wait_to_add(a: u32, b: u32) -> u32 {
  sleep(1000).await;
  a + b
}
}

Если мы вызовем add(15, 3).await, то он немедленно вернет результат 18. Если мы вызовем wait_to_add(15, 3).await, мы в конечном итоге получим тот же ответ, но пока мы ждем, другая задача получит возможность запуститься.

В этом глупом примере вызов sleep является заменой выполнения какой-то длительной задачи, где нам приходится ждать результата. Обычно это операция IO, где результатом являются данные, прочитанные из внешнего источника, или подтверждение того, что запись во внешнее место назначения удалась. Чтение выглядит примерно так: let data = read(...).await?. В этом случае await заставит текущую задачу ждать, пока происходит чтение. Задача возобновит работу, когда чтение будет завершено (другие задачи могут выполнить некоторую работу, пока задача чтения ждет). Результатом чтения могут быть успешно прочитанные данные или ошибка (обрабатываемая оператором ?).

Обратите внимание, что если мы вызовем add или wait_to_add или read без использования .await, мы не получим никакого ответа!

Что?

Вызов асинхронной функции возвращает фьючерс, он не выполняет немедленно код внутри функции. Более того, фьючерс не выполняет никакой работы, пока его не await2. Это контрастирует с некоторыми другими языками, где асинхронная функция возвращает фьючерс, который начинает выполняться немедленно.

Это важный момент в асинхронном программировании на Rust. Через некоторое время это станет второй натурой, но это часто сбивает с толку начинающих, особенно тех, у кого есть опыт асинхронного программирования на других языках.

Важная интуиция о фьючерсах в Rust заключается в том, что они инертны. Чтобы выполнить какую-либо работу, их должно продвигать вперед внешняя сила (обычно асинхронная среда выполнения).

Мы описали await довольно операционно (он запускает фьючерс, производя результат), но мы говорили в предыдущей главе об асинхронных задачах и конкурентности, как await вписывается в эту ментальную модель? Во-первых, рассмотрим чисто последовательный код: логически, вызов функции просто выполняет код в функции (с некоторым присваиванием переменных). Другими словами, текущая задача продолжает выполнять следующий «кусок» кода, определенный функцией. Аналогично, в асинхронном контексте вызов неасинхронной функции просто продолжает выполнение с этой функции. Вызов асинхронной функции находит код для выполнения, но не выполняет его. await — это оператор, который продолжает выполнение текущей задачи, или, если текущая задача не может продолжить прямо сейчас, дает другой задаче возможность продолжить.

await может использоваться только внутри асинхронного контекста, пока это означает внутри асинхронной функции (позже мы увидим больше видов асинхронных контекстов). Чтобы понять почему, вспомните, что await может передавать управление среде выполнения, чтобы другая задача могла выполниться. Среда выполнения, которой можно передать управление, существует только в асинхронном контексте. Пока вы можете представить среду выполнения как глобальную переменную, доступную только в асинхронных функциях, позже мы объясним, как это работает на самом деле.

Наконец, для еще одной перспективы на await: мы упомянули ранее, что фьючерсы можно комбинировать, чтобы создавать «большие» фьючерсы. Асинхронные функции — это один из способов определить фьючерс, а await — один из способов комбинировать фьючерсы. Использование await на фьючерсе объединяет этот фьючерс в фьючерс, производимый асинхронной функцией, внутри которой он используется. Мы поговорим подробнее об этой перспективе и других способах комбинирования фьючерсов позже.

Некоторые примеры async/await

Давайте начнем с повторного посещения нашего примера «Hello, world!»:

// Define an async function.
async fn say_hello() {
    println!("hello, world!");
}

#[tokio::main] // Boilerplate which lets us write `async fn main`, we'll explain it later.
async fn main() {
    // Call an async function and await its result.
    say_hello().await;
}

Теперь вы должны узнавать шаблонный код вокруг main. Он нужен для инициализации среды выполнения Tokio и создания начальной задачи для запуска асинхронной функции main.

say_hello — это асинхронная функция, когда мы вызываем ее, мы должны следовать за вызовом .await, чтобы запустить ее как часть текущей задачи. Обратите внимание, что если вы удалите .await, то запуск программы ничего не делает! Вызов say_hello возвращает фьючерс, но он никогда не выполняется, поэтому println никогда не вызывается (компилятор, по крайней мере, предупредит вас).

Вот немного более реалистичный пример, взятый из учебника Tokio.

#[tokio::main]
async fn main() -> Result<()> {
    // Открываем соединение с адресом mini-redis.
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Устанавливаем ключ "hello" со значением "world"
    client.set("hello", "world".into()).await?;

    // Получаем ключ "hello"
    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

Код немного интереснее, но мы essentially делаем то же самое — вызываем асинхронные функции, а затем ожидаем (await) выполнения результата. На этот раз мы используем ? для обработки ошибок — это работает так же, как в синхронном Rust.

При всей нашей болтовне о конкурентности, параллелизме и асинхронности, оба этих примера на 100% последовательны. Простой вызов и ожидание асинхронных функций не вводят никакой конкурентности, если нет других задач для планирования, пока ожидающая задача ждет. Чтобы доказать это себе, давайте посмотрим на другой простой (но надуманный) пример:

use std::io::{stdout, Write};
use tokio::time::{sleep, Duration};

async fn say_hello() {
    print!("hello, ");
    // Flush stdout so we see the effect of the above `print` immediately.
    stdout().flush().unwrap();
}

async fn say_world() {
    println!("world!");
}

#[tokio::main]
async fn main() {
    say_hello().await;
    // An async sleep function, puts the current task to sleep for 1s.
    sleep(Duration::from_millis(1000)).await;
    say_world().await;
}

Между выводом "hello" и "world" мы усыпляем текущую задачу3 на одну секунду. Наблюдайте, что происходит при запуске программы: она печатает "hello", ничего не делает в течение одной секунды, затем печатает "world". Это потому, что выполнение одной задачи является чисто последовательным. Если бы у нас была какая-то конкурентность, то эта одна секунда сна была бы отличной возможностью сделать другую работу, например, напечатать "world". Мы увидим, как это сделать, в следующем разделе.

Порождение задач (Spawning tasks)

Мы говорили об async и await как о способе запуска кода в асинхронной задаче. И мы сказали, что await может усыпить текущую задачу, пока она ждет IO или какое-либо другое событие. Когда это происходит, другая задача может запуститься, но откуда берутся эти другие задачи? Так же, как мы используем std::thread::spawn для порождения новой задачи, мы можем использовать tokio::spawn для порождения новой асинхронной задачи. Обратите внимание, что spawn — это функция Tokio, среды выполнения, а не из стандартной библиотеки Rust, потому что задачи — это чисто концепция среды выполнения.

Вот крошечный пример запуска асинхронной функции в отдельной задаче с помощью spawn:

use tokio::{spawn, time::{sleep, Duration}};

async fn say_hello() {
    // Wait for a while before printing to make it a more interesting race.
    sleep(Duration::from_millis(100)).await;
    println!("hello");
}

async fn say_world() {
    sleep(Duration::from_millis(100)).await;
    println!("world!");
}

#[tokio::main]
async fn main() {
    spawn(say_hello());
    spawn(say_world());
    // Wait for a while to give the tasks time to run.
    sleep(Duration::from_millis(1000)).await;
}

Аналогично последнему примеру, у нас есть две функции, печатающие "hello" и "world!". Но на этот раз мы запускаем их конкурентно (и параллельно), а не последовательно. Если вы запустите программу несколько раз, вы должны увидеть, что строки печатаются в обоих порядках — иногда сначала "hello", иногда сначала "world!". Классическая гонка в конкурентном программировании!

Давайте углубимся в то, что здесь происходит. В игре три концепции: фьючерсы, задачи и потоки. Функция spawn принимает фьючерс (который, помните, может состоять из многих более мелких фьючерсов) и запускает его как новую задачу Tokio. Задачи — это концепция, которую среда выполнения Tokio планирует и управляет (не отдельными фьючерсами). Tokio (в своей конфигурации по умолчанию) — это многопоточная среда выполнения, что означает, что когда мы порождаем новую задачу, эта задача может быть запущена на другом потоке ОС, чем задача, из которой она была порождена (она может быть запущена на том же потоке, или она может начаться на одном потоке, а затем быть перемещена на другой позже).

Итак, когда фьючерс порождается как задача, он выполняется конкурентно с задачей, из которой он был порожден, и любыми другими задачами. Он также может выполняться параллельно этим задачам, если он запланирован на другом потоке.

Подводя итог, когда мы пишем два оператора, следующих друг за другом в Rust, они выполняются последовательно (независимо от того, в асинхронном коде или нет). Когда мы пишем await, это не меняет конкурентность последовательных операторов. Например, foo(); bar(); строго последовательно — foo вызывается, а затем bar. Это верно, независимо от того, являются ли foo и bar асинхронными функциями или нет. foo().await; bar().await; также строго последовательно, foo полностью вычисляется, а затем bar полностью вычисляется. В обоих случаях другой поток может чередоваться с последовательным выполнением, и во втором случае другая асинхронная задача может чередоваться в точках await, но два оператора выполняются последовательно по отношению друг к другу в обоих случаях.

Если мы используем либо thread::spawn, либо tokio::spawn, мы вводим конкурентность и потенциально параллелизм, в первом случае между потоками, а во втором — между задачами.

Позже в руководстве мы увидим случаи, когда мы выполняем фьючерсы конкурентно, но никогда параллельно.

Соединение задач (Joining tasks)

Если мы хотим получить результат выполнения порожденной задачи, то порождающая задача может ждать ее завершения и использовать результат, это называется соединением (joining) задач (аналогично соединению потоков, и API для соединения похожи).

Когда задача порождается, функция spawn возвращает JoinHandle. Если вы просто хотите, чтобы задача выполняла свою работу, JoinHandle можно отбросить (удаление JoinHandle не влияет на порожденную задачу). Но если вы хотите, чтобы порождающая задача ждала завершения порожденной задачи, а затем использовала результат, вы можете awaitить JoinHandle, чтобы сделать это.

Например, давайте еще раз пересмотрим наш пример «Hello, world!»:

use tokio::{spawn, time::{sleep, Duration}};

async fn say_hello() {
    // Wait for a while before printing to make it a more interesting race.
    sleep(Duration::from_millis(100)).await;
    println!("hello");
}

async fn say_world() {
    sleep(Duration::from_millis(100)).await;
    println!("world");
}

#[tokio::main]
async fn main() {
    let handle1 = spawn(say_hello());
    let handle2 = spawn(say_world());
    
    let _ = handle1.await;
    let _ = handle2.await;

    println!("!");
}

Код похож на прошлый раз, но вместо простого вызова spawn мы сохраняем возвращенные JoinHandle и позже awaitим их. Поскольку мы ждем завершения этих задач перед выходом из функции main, нам больше не нужен sleep в main.

Две порожденные задачи все еще выполняются конкурентно. Если вы запустите программу несколько раз, вы должны увидеть оба порядка. Однако awaitенные join handles являются ограничением на конкурентность: восклицательный знак ('!') всегда будет напечатан последним (вы можете поэкспериментировать с перемещением println!("!"); относительно await. Вам, вероятно, также нужно будет изменить время сна, чтобы получить наблюдаемые эффекты).

Если бы мы сразу выполнили awaitили JoinHandle первого spawn вместо того, чтобы сохранить его и позже awaitить (т.е. написали spawn(say_hello()).await;), то мы бы породили другую задачу для запуска фьючерса 'hello', но порождающая задача ждала бы ее завершения, прежде чем делать что-либо еще. Другими словами, не было бы возможной конкурентности! Вы почти никогда не захотите делать это (потому что зачем тогда вообще spawn? Просто напишите последовательный код).

JoinHandle

Мы быстро рассмотрим JoinHandle немного глубже. Тот факт, что мы можем awaitить JoinHandle, является подсказкой, что JoinHandle сам по себе является фьючерсом. spawn — это не async функция, это обычная функция, которая возвращает фьючерс (JoinHandle). Она выполняет некоторую работу (для планирования задачи) перед возвратом фьючерса (в отличие от асинхронного фьючерса), поэтому нам не нужно awaitить spawn. Ожидание JoinHandle ждет завершения порожденной задачи, а затем возвращает результат. В приведенном выше примере не было результата, мы просто ждали завершения задачи. JoinHandle — это обобщенный тип, и его параметр типа — это тип, возвращаемый порожденной задачей. В приведенном выше примере тип был бы JoinHandle<()>, фьючерс, который дает String, производил бы JoinHandle с типом JoinHandle<String>.

awaitение JoinHandle возвращает Result (поэтому мы использовали let _ = ... в приведенном выше примере, это избегает предупреждения о неиспользуемом Result). Если порожденная задача завершилась успешно, то результат задачи будет в варианте Ok. Если задача запаниковала или была прервана (форма отмены), то результат будет Err, содержащий тип JoinError. Если вы не используете отмену через abort в своем проекте, то unwrapping результата JoinHandle.await является разумным подходом, поскольку это effectively распространяет панику из порожденной задачи в порождающую задачу.


  1. как и любой другой поток, поток, на котором выполняется асинхронная функция, может быть вытеснен (pre-empted) операционной системой и приостановлен, чтобы другой поток мог выполнить некоторую работу. Однако с точки зрения функции это не наблюдается без проверки данных, которые могли быть изменены другими потоками (и которые могли быть изменены другим потоком, выполняющимся параллельно без приостановки текущего потока).

  2. Или опрошен (polled), что является операцией более низкого уровня, чем await, и происходит за кулисами при использовании await. Мы поговорим об опросе позже, когда будем подробно говорить о фьючерсах.

  3. Обратите внимание, что здесь мы используем асинхронную функцию сна, если бы мы использовали sleep из std, мы бы усыпили весь поток. Это не имело бы никакого значения в этом игрушечном примере, но в реальной программе это означало бы, что другие задачи не могут быть запланированы на этом потоке в течение этого времени. Это очень плохо.