Возврат управления рантайму

Вспомните из раздела ["Наша первая асинхронная программа"], что в каждой точке ожидания (await point) Rust дает рантайму возможность приостановить задачу и переключиться на другую, если ожидаемый фьючерс еще не готов. Верно и обратное: Rust приостанавливает асинхронные блоки и возвращает управление рантайму только в точках ожидания. Всё, что находится между точками ожидания, выполняется синхронно.

Это означает, что если вы выполняете много работы в асинхронном блоке без точек ожидания, этот фьючерс будет блокировать прогресс любых других фьючерсов. Вы можете иногда слышать, что это называют ситуацией, когда один фьючерс "морит голодом" (starving) другие фьючерсы. В некоторых случаях это может быть не страшно. Однако если вы выполняете какое-то ресурсоемкое начальное действие или длительную работу, или если у вас есть фьючерс, который будет постоянно выполнять какую-то определенную задачу, вам нужно подумать о том, когда и где возвращать управление рантайму.

Давайте смоделируем длительную операцию, чтобы проиллюстрировать проблему "голодания", а затем исследуем, как ее решить. В Листинге 17-14 представлена функция slow.

Файл: src/main.rs

#![allow(unused)]
fn main() {
fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
}

Листинг 17-14: Использование thread::sleep для имитации медленных операций

Этот код использует std::thread::sleep вместо trpl::sleep, поэтому вызов slow будет блокировать текущий поток на некоторое количество миллисекунд. Мы можем использовать slow для имитации реальных операций, которые являются одновременно длительными и блокирующими.

В Листинге 17-15 мы используем slow, чтобы сымитировать выполнение такой работы, ограниченной по CPU, в паре фьючерсов.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
}

Листинг 17-15: Вызов slow для имитации выполнения медленных операций

Каждый фьючерс возвращает управление рантайму только после выполнения целой серии медленных операций. Если вы запустите этот код, вы увидите такой вывод:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

Как и в Листинге 17-5, где мы использовали trpl::select для "гонки" фьючерсов, загружающих два URL, select все равно завершается, как только a готов. Однако между вызовами slow в двух фьючерсах нет чередования. Фьючерс a выполняет всю свою работу до тех пор, пока не будет достигнут вызов trpl::sleep, затем фьючерс b выполняет всю свою работу до тех пор, пока не будет достигнут его собственный вызов trpl::sleep, и, наконец, фьючерс a завершается. Чтобы позволить обоим фьючерсам прогрессировать между их медленными задачами, нам нужны точки ожидания, чтобы мы могли вернуть управление рантайму. Это означает, что нам нужно что-то, что мы можем ожидать (await)!

Мы уже видим, что такая передача управления происходит в Листинге 17-15: если бы мы убрали trpl::sleep в конце фьючерса a, он бы завершился, а фьючерс b вообще не запустился бы. Давайте попробуем использовать функцию trpl::sleep в качестве отправной точки для того, чтобы позволить операциям поочередно прогрессировать, как показано в Листинге 17-16.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };
}

Листинг 17-16: Использование trpl::sleep для того, чтобы позволить операциям поочередно прогрессировать

Мы добавили вызовы trpl::sleep с точками ожидания между каждым вызовом slow. Теперь работа двух фьючерсов чередуется:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

Фьючерс a все еще работает некоторое время, прежде чем передать управление b, потому что он вызывает slow до любого вызова trpl::sleep, но после этого фьючерсы меняются местами каждый раз, когда один из них достигает точки ожидания. В данном случае мы сделали это после каждого вызова slow, но мы могли бы разбить работу любым способом, который имеет для нас наибольший смысл.

Однако на самом деле мы не хотим "спать" здесь: мы хотим прогрессировать как можно быстрее. Нам просто нужно вернуть управление рантайму. Мы можем сделать это напрямую, используя функцию trpl::yield_now. В Листинге 17-17 мы заменяем все эти вызовы trpl::sleep на trpl::yield_now.

Файл: src/main.rs

#![allow(unused)]
fn main() {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };
}

Листинг 17-17: Использование yield_now для того, чтобы позволить операциям поочередно прогрессировать

Этот код более четко отражает фактическое намерение и может быть значительно быстрее, чем использование sleep, потому что таймеры, такие как тот, что используется в sleep, часто имеют ограничения на свою точность. Версия sleep, которую мы используем, например, всегда будет "спать" как минимум миллисекунду, даже если мы передадим ей Duration в одну наносекунду. Повторим: современные компьютеры быстры — они могут сделать очень многое за одну миллисекунду!

Это означает, что async может быть полезен даже для задач, ограниченных по CPU, в зависимости от того, что еще делает ваша программа, потому что он предоставляет полезный инструмент для структурирования отношений между различными частями программы (но ценой накладных расходов на асинхронную state machine). Это форма кооперативной многозадачности (cooperative multitasking), где каждый фьючерс имеет возможность определять, когда он передает управление через точки ожидания. Следовательно, каждый фьючерс также несет ответственность за то, чтобы избегать блокировки на слишком долгое время. В некоторых встраиваемых (embedded) операционных системах на основе Rust это единственный вид многозадачности!

В реальном коде вы, конечно, обычно не будете чередовать вызовы функций с точками ожидания в каждой строке. Хотя передача управления таким образом относительно недорога, она не бесплатна. Во многих случаях попытка разбить задачу, ограниченную по CPU, может сделать ее значительно медленнее, поэтому иногда для общей производительности лучше позволить операции ненадолго заблокироваться. Всегда проводите замеры, чтобы увидеть, каковы реальные узкие места производительности вашего кода. Однако лежащая в основе динамика важна, и о ней стоит помнить, если вы видите, что много работы выполняется последовательно, хотя вы ожидали, что она будет выполняться конкурентно!

Создание наших собственных асинхронных абстракций

Мы также можем комбинировать фьючерсы вместе для создания новых паттернов. Например, мы можем построить функцию timeout с помощью уже имеющихся асинхронных строительных блоков. Когда мы закончим, результатом будет еще один строительный блок, который мы могли бы использовать для создания еще более сложных асинхронных абстракций.

Листинг 17-18 показывает, как мы ожидаем, что этот timeout будет работать с медленным фьючерсом.

Файл: src/main.rs

#![allow(unused)]
fn main() {
// [Этот код не компилируется!]
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
}

Листинг 17-18: Использование нашей воображаемой функции timeout для выполнения медленной операции с ограничением по времени

Давайте реализуем это! Для начала давайте подумаем об API для timeout:

  • Она сама должна быть асинхронной функцией, чтобы мы могли ее ожидать (await).
  • Ее первый параметр должен быть фьючерсом для выполнения. Мы можем сделать его обобщенным (generic), чтобы он работал с любым фьючерсом.
  • Ее второй параметр — это максимальное время ожидания. Если мы используем Duration, это упростит передачу в trpl::sleep.
  • Она должна возвращать Result. Если фьючерс завершается успешно, Result будет Ok со значением, произведенным фьючерсом. Если сначала истекает таймаут, Result будет Err с длительностью (duration), которую таймаут ожидал.

Листинг 17-19 показывает это объявление.

Файл: src/main.rs

#![allow(unused)]
fn main() {
// [Этот код не компилируется!]
async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Здесь будет наша реализация!
}
}

Листинг 17-19: Определение сигнатуры функции timeout

Это удовлетворяет нашим целям для типов. Теперь давайте подумаем о поведении, которое нам нужно: мы хотим устроить "гонку" между переданным фьючерсом и длительностью. Мы можем использовать trpl::sleep, чтобы создать фьючерс-таймер из длительности, и использовать trpl::select, чтобы запустить этот таймер вместе с фьючерсом, переданным вызывающей стороной.

В Листинге 17-20 мы реализуем timeout, сопоставляя результат ожидания trpl::select.

Файл: src/main.rs

#![allow(unused)]
fn main() {
use trpl::Either;

// --snip--

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
}

Листинг 17-20: Определение timeout с помощью select и sleep

Реализация trpl::select не является честной (fair): она всегда опрашивает (polls) аргументы в том порядке, в котором они переданы (другие реализации select могут случайным образом выбирать, какой аргумент опрашивать первым). Таким образом, мы передаем future_to_try в select первым, чтобы у него был шанс завершиться, даже если max_time — очень короткая длительность. Если future_to_try завершится первым, select вернет Left с выходным значением от future_to_try. Если таймер завершится первым, select вернет Right с выходным значением таймера ().

Если future_to_try успешен, и мы получаем Left(output), мы возвращаем Ok(output). Если вместо этого истекает таймер сна sleep, и мы получаем Right(()), мы игнорируем () с помощью _ и возвращаем Err(max_time).

Таким образом, у нас есть работающий timeout, построенный из двух других асинхронных помощников. Если мы запустим наш код, он выведет сообщение об ошибке после таймаута:

Failed after 2 seconds

Поскольку фьючерсы комбинируются с другими фьючерсами, вы можете строить действительно мощные инструменты, используя меньшие асинхронные строительные блоки. Например, вы можете использовать этот же подход, чтобы комбинировать таймауты с повторными попытками (retries), и, в свою очередь, использовать их с такими операциями, как сетевые вызовы (как те, что в Листинге 17-5).

На практике вы обычно будете работать напрямую с async и await, а во вторую очередь — с такими функциями, как select, и макросами, такими как join!, чтобы контролировать, как выполняются самые внешние фьючерсы.

Мы рассмотрели несколько способов работы с несколькими фьючерсами одновременно. Далее мы посмотрим, как мы можем работать с несколькими фьючерсами в последовательности с течением времени с помощью потоков (streams).