Дополнительные темы по async/await

Модульные тесты (Unit tests)

Как писать модульные тесты для асинхронного кода? Проблема в том, что await можно использовать только внутри асинхронного контекста, а модульные тесты в Rust по умолчанию не являются асинхронными. К счастью, большинство сред выполнения предоставляют удобные атрибуты для тестов, аналогичные атрибуту для async main. При использовании Tokio это выглядит так:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_something() {
  // Пишите тест здесь, включая все нужные вам `await`.
}
}

Существует множество способов настройки теста, подробности см. в документации.

Атрибут #[tokio::test] автоматически создает среду выполнения Tokio для выполнения асинхронного теста. Это позволяет использовать .await внутри тестовой функции так же, как и в обычном асинхронном коде.

Для тестирования асинхронных функций, которые могут возвращать ошибки, вы можете использовать assert!(result.is_ok()) или assert!(result.is_err()) для проверки результата Result.

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_fetch_data_error() {
    let invalid_url = "https://invalid-url";
    let result = fetch_data(invalid_url).await;
    assert!(result.is_err());
}
}

Более сложные темы тестирования асинхронного кода (например, тестирование на состояние гонки, взаимоблокировки и т.д.) будут рассмотрены позже в этом руководстве.

Блокировки и отмена (Cancellation)

Блокировки и отмена — это важные концепции, которые необходимо учитывать при программировании на асинхронном Rust. Это не особенности конкретных функций, а повсеместные свойства системы, которые вы должны понимать, чтобы писать корректный код.

Блокирующий IO (Blocking IO)

Мы говорим, что поток (здесь речь идет о потоках ОС, а не об асинхронных задачах) заблокирован, когда он не может делать прогресс. Обычно это происходит потому, что он ожидает завершения задачи от ОС (чаще всего операции ввода-вывода). Важно, что пока поток заблокирован, ОС знает, что его не нужно планировать, чтобы другие потоки могли работать. Это нормально в многопоточной программе, так как позволяет другим потокам работать, пока заблокированный поток ожидает. Однако в асинхронной программе есть другие задачи, которые должны быть запланированы на том же потоке ОС, но ОС ничего о них не знает и заставляет весь поток ожидать. Это означает, что вместо одной задачи, ожидающей завершения своего IO (что нормально), ждать вынуждены многие задачи (что уже плохо).

Скоро мы поговорим о неблокирующем/асинхронном IO. Пока просто знайте, что неблокирующий IO — это IO, о котором знает асинхронная среда выполнения, и поэтому ждать будет только текущая задача, а поток не будет заблокирован. Крайне важно использовать в асинхронной задаче только неблокирующий IO, и никогда — блокирующий IO (который является единственным видом, предоставляемым стандартной библиотекой Rust).

Блокирующие вычисления (Blocking computation)

Вы также можете заблокировать поток, выполняя вычисления (это не совсем то же самое, что блокирующий IO, поскольку ОС не участвует, но эффект схож). Если у вас есть длительное вычисление (с блокирующим IO или без него) без уступки управления среде выполнения, то эта задача никогда не даст планировщику среды выполнения шанс запланировать другие задачи. Помните, что асинхронное программирование использует кооперативную многозадачность. Здесь задача не сотрудничает, поэтому другие задачи не получат возможности выполнить свою работу. Способы решения этой проблемы мы обсудим позже.

Существует множество других способов заблокировать целый поток, и мы еще не раз вернемся к теме блокировок в этом руководстве.

Отмена (Cancellation)

Отмена означает остановку выполнения фьючерса (или задачи). Поскольку в Rust (в отличие от многих других систем async/await) фьючерсы должны продвигаться вперед внешней силой (например, асинхронной средой выполнения), если фьючерс больше не продвигается, то он не будет выполняться. Если фьючерс удаляется (помните, фьючерс — это просто обычный старый объект Rust), то он больше не может делать прогресс и считается отмененным.

Инициировать отмену можно несколькими способами:

  • Простым удалением фьючерса (если он вам принадлежит).
  • Вызовом abort у JoinHandle задачи (или использованием AbortHandle).
  • Через CancellationToken (что требует, чтобы отменяемый фьючерс отслеживал токен и кооперативно отменял сам себя).
  • Неявно, с помощью функции или макроса, такого как select.

Второй и третий способы специфичны для Tokio, хотя большинство сред выполнения предоставляют аналогичные возможности. Использование CancellationToken требует сотрудничества со стороны отменяемого фьючерса, а другие способы — нет. В этих других случаях отмененный фьючерс не получит уведомления об отмене и не получит возможности очистить ресурсы (кроме своего деструктора). Обратите внимание, что даже если у фьючерса есть токен отмены, он все равно может быть отменен другими методами, которые не активируют этот токен.

С точки зрения написания асинхронного кода (в асинхронных функциях, блоках, фьючерсах и т.д.), код может остановить выполнение в любой точке await (включая скрытые внутри макросов) и никогда больше не возобновить его. Чтобы ваш код был корректным (в частности, безопасным к отмене - cancellation safe), он должен работать правильно как при нормальном завершении, так и при завершении в любой точке await1.

#![allow(unused)]
fn main() {
async fn some_function(input: Option<Input>) {
    let Some(input) = input else {
        return;           // Может завершиться здесь (`return`).
    };

    let x = foo(input)?;  // Может завершиться здесь (`?`).

    let y = bar(x).await; // Может завершиться здесь (`await`).

    // ...

    //                       Может завершиться здесь (неявный return).
}
}

Пример того, как это может пойти не так: если асинхронная функция читает данные во внутренний буфер, а затем ожидает (await) следующий фрагмент данных. Если чтение данных является деструктивным (т.е. их нельзя перечитать из исходного источника) и асинхронная функция отменена, то внутренний буфер будет удален, и данные в нем будут потеряны. Важно учитывать, как на фьючерс и любые данные, к которым он обращается, повлияет отмена фьючерса, его перезапуск или запуск нового фьючерса, который обращается к тем же данным.

Мы еще несколько раз вернемся к отмене и безопасности отмены в этом руководстве, а в справочном разделе есть целая глава, посвященная этой теме.

Асинхронные блоки (Async blocks)

Обычный блок ({ ... }) группирует код в исходном тексте и создает область видимости для имен. Во время выполнения блок выполняется по порядку и вычисляется в значение своего последнего выражения (или в тип-единицу (), если завершающего выражения нет).

Подобно асинхронным функциям, асинхронный блок — это отложенная версия обычного блока. Асинхронный блок группирует код и имена, но во время выполнения он не выполняется немедленно и вычисляется в фьючерс. Чтобы выполнить блок и получить результат, его необходимо awaitить. Например:

#![allow(unused)]
fn main() {
let s1 = {
    let a = 42;
    format!("The answer is {a}")
};

let s2 = async {
    let q = question().await;
    format!("The question is {q}")
};
}

Если бы мы выполнили этот фрагмент, s1 была бы строкой, которую можно напечатать, а s2 была бы фьючерсом; question() даже не была бы вызвана. Чтобы напечатать s2, нам сначала нужно сделать s2.await.

Асинхронный блок — это простейший способ начать асинхронный контекст и создать фьючерс. Он обычно используется для создания небольших фьючерсов, которые используются только в одном месте.

К сожалению, управление потоком выполнения с асинхронными блоками немного своеобразно. Поскольку асинхронный блок создает фьючерс, а не выполняется напрямую, он ведет себя больше как функция, чем обычный блок, с точки зрения управления потоком. break и continue не могут «пройти сквозь» асинхронный блок, как это бывает с обычными блоками; вместо этого вам придется использовать return:

#![allow(unused)]
fn main() {
loop {
    {
        if ... {
            // ok
            continue;
        }
    }

    async {
        if ... {
            // не ok
            // continue;

            // ok - продолжает со следующей итерации `loop`, хотя учтите, что если бы в цикле после
            // асинхронного блока был код, он бы выполнился.
            return;
        }
    }.await
}
}

Чтобы реализовать break, вам нужно будет проверить значение блока (распространенной идиомой является использование ControlFlow для значения блока, что также позволяет использовать ?).

Аналогично, ? внутри асинхронного блока завершит выполнение фьючерса при ошибке, в результате чего awaitенный блок примет значение ошибки, но не выйдет из окружающей функции (как это сделал бы ? в обычном блоке). Для этого вам понадобится еще один ? после await:

#![allow(unused)]
fn main() {
async {
    let x = foo()?;   // Этот `?` выходит только из асинхронного блока, а не из окружающей функции.
    consume(x);
    Ok(())
}.await?
}

Раздражает, что это часто сбивает с толку компилятор, поскольку (в отличие от функций) «возвращаемый» тип асинхронного блока не указан явно. Вам, вероятно, потребуется добавить некоторые аннотации типов к переменным или использовать турбо-рыбу (turbofish) для указания типов, например, Ok::<_, MyError>(()) вместо Ok(()) в приведенном выше примере.

Функция, возвращающая асинхронный блок, довольно похожа на асинхронную функцию. Написание async fn foo() -> ... { ... } примерно эквивалентно fn foo() -> ... { async { ... } }. Фактически, с точки зрения вызывающей стороны они эквивалентны, и переход от одной формы к другой не является критическим изменением. Более того, вы можете переопределить одну другой при реализации асинхронного трейта (см. ниже). Однако вам придется скорректировать тип, сделав Future явным в версии с асинхронным блоком: async fn foo() -> Foo становится fn foo() -> impl Future<Output = Foo> (вам также, возможно, потребуется сделать явными другие ограничения, например, Send и 'static).

Обычно следует предпочитать версию с асинхронной функцией, поскольку она проще и понятнее. Однако версия с асинхронным блоком более гибкая, поскольку вы можете выполнить некоторый код при вызове функции (написав его вне асинхронного блока) и некоторый код, когда результат ожидается (код внутри асинхронного блока).

Асинхронные замыкания (Async closures)

  • Замыкания
    • Скоро будет (https://github.com/rust-lang/rust/pull/132706, https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing.html)
    • Асинхронные блоки в замыканиях против асинхронных замыканий

Время жизни и заимствование (Lifetimes and borrowing)

  • Упоминалось о времени жизни 'static выше
  • Ограничения времени жизни для фьючерсов (Future + '_ и т.д.)
  • Заимствование across await points
  • Я не знаю, я уверен, что с асинхронными функциями есть еще проблемы с временем жизни...

Ограничения Send + 'static на фьючерсах

  • Почему они там есть, многопоточные среды выполнения
  • spawn_local, чтобы избежать их
  • Что делает асинхронную функцию Send + 'static и как исправлять ошибки с этим связанные

Асинхронные трейты (Async traits)

  • Синтаксис
    • Проблема с Send + 'static и способы ее обхода
      • trait_variant
      • Явный future
      • Нотация возвращаемого типа (Return Type Notation, RTN) (https://blog.rust-lang.org/inside-rust/2024/09/26/rtn-call-for-testing.html)
  • Переопределение (overriding)
    • Нотация future против async для методов
  • Безопасность объектов (object safety)
  • Правила захвата (capture rules) (https://blog.rust-lang.org/2024/09/05/impl-trait-capture-rules.html)
  • История и крейт async-trait

Рекурсия (Recursion)

  • Разрешена (относительно недавно), но требует явного помещения в Box (boxing).
    • Ссылка вперед на фьючерсы, закрепление (pinning)
    • https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
    • https://blog.rust-lang.org/2024/03/21/Rust-1.77.0.html#support-for-recursion-in-async-fn
    • Макрос async-recursion (https://docs.rs/async-recursion/latest/async_recursion/)

Рекурсивные асинхронные функции создают тип конечного автомата, который содержит сам себя, что приводит к типу бесконечного размера. Компилятор выдаст ошибку: "recursion in an async fn requires boxing".

Для исправления этой проблемы можно использовать крейт async_recursion, который предоставляет атрибут #[async_recursion] для автоматического преобразования функции. Этот макрос изменяет функцию так, чтобы она возвращала запакованный (boxed) фьючерс.

#![allow(unused)]
fn main() {
use async_recursion::async_recursion;

#[async_recursion]
async fn fib(n: u32) -> u32 {
   match n {
       0 | 1 => 1,
       _ => fib(n-1).await + fib(n-2).await
   }
}
}

Макрос также позволяет контролировать ограничения трейтов для возвращаемого фьючерса с помощью опций (?Send) и (Sync).


  1. Интересно сравнить отмену в асинхронном программировании с отменой потоков. Отменить поток возможно (например, с помощью pthread_cancel в C; в Rust нет прямого способа сделать это), но это почти всегда очень и очень плохая идея, поскольку отменяемый поток может завершиться где угодно. В отличие от этого, отмена асинхронной задачи может произойти только в точке await. Как следствие, отмена потока ОС без завершения всего процесса происходит очень редко, и вам, как программисту, обычно не нужно об этом беспокоиться. Однако в асинхронном Rust отмена — это то, что может произойти. Мы обсудим, как с этим бороться, по мере нашего продвижения.