Async: Что такое блокировка?
Опубликовано 2020-12-21
Автор: Alice Ryhl
Функция async/await в Rust реализована с использованием механизма, известного как кооперативная многозадачность (cooperative scheduling), и это имеет важные последствия для тех, кто пишет асинхронный код на Rust.
Целевая аудитория этой статьи — новые пользователи асинхронного Rust. В примерах я буду использовать рантайм Tokio, но поднятые вопросы применимы к любому асинхронному рантайму.
Если вы запомните из этой статьи лишь одну вещь, пусть это будет она:
Асинхронный код никогда не должен проводить много времени, не достигая .await.
Блокирующий vs. неблокирующий код
Наивный способ написать приложение, которое работает над многими задачами одновременно, — это порождать новый поток для каждой задачи. Если количество задач невелико, это perfectly fine решение, но когда количество задач становится большим, вы в конечном итоге столкнетесь с проблемами из-за большого количества потоков. В разных языках программирования существуют различные решения этой проблемы, но все они сводятся к одному: очень быстро "переключать" текущую выполняемую задачу в каждом потоке, так чтобы все задачи получили возможность выполниться. В Rust это переключение происходит, когда вы вызываете .await.
При написании асинхронного Rust фраза «блокировать поток» означает «препятствовать рантайму в переключении текущей задачи». Это может стать серьезной проблемой, потому что это означает, что другие задачи в том же рантайме перестанут выполняться, пока поток заблокирован. Чтобы предотвратить это, мы должны писать код, который можно быстро переключать, что достигается за счет того, чтобы никогда не проводить долгое время вдали от .await.
Давайте рассмотрим пример: ▶︎
use std::time::Duration; #[tokio::main] async fn main() { println!("Hello World!"); // Здесь нет .await! std::thread::sleep(Duration::from_secs(5)); println!("Five seconds later..."); }
Вышеприведенный код выглядит правильным, и если вы запустите его, он, казалось бы, сработает. Но в нем есть фатальный изъян: он блокирует поток. В данном случае других задач нет, так что это не проблема, но в реальных программах это будет не так. Чтобы проиллюстрировать это, рассмотрим следующий пример: ▶︎
use std::time::Duration; async fn sleep_then_print(timer: i32) { println!("Start timer {}.", timer); // Здесь нет .await! std::thread::sleep(Duration::from_secs(1)); println!("Timer {} done.", timer); } #[tokio::main] async fn main() { // Макрос join! позволяет запускать несколько задач конкурентно. tokio::join!( sleep_then_print(1), sleep_then_print(2), sleep_then_print(3), ); }
Start timer 1.
Timer 1 done.
Start timer 2.
Timer 2 done.
Start timer 3.
Timer 3 done.
Примеру потребуется три секунды на выполнение, и таймеры будут запускаться один за другим, без какого-либо параллелизма. Причина проста: рантайм Tokio не смог переключить одну задачу на другую, потому что такое переключение может произойти только на .await. Поскольку в sleep_then_print нет .await, никакое переключение не может произойти, пока он выполняется.
Однако, если мы вместо этого используем функцию сна Tokio, которая использует .await для ожидания, функция будет вести себя правильно:
▶︎
use tokio::time::Duration; async fn sleep_then_print(timer: i32) { println!("Start timer {}.", timer); tokio::time::sleep(Duration::from_secs(1)).await; // ^ выполнение может быть приостановлено здесь println!("Timer {} done.", timer); } #[tokio::main] async fn main() { // Макрос join! позволяет запускать несколько задач конкурентно. tokio::join!( sleep_then_print(1), sleep_then_print(2), sleep_then_print(3), ); }
Start timer 1.
Start timer 2.
Start timer 3.
Timer 1 done.
Timer 2 done.
Timer 3 done.
Код выполняется всего за одну секунду и правильно, как и задумано, запускает все три функции одновременно.
Имейте в виду, что это не всегда так очевидно. Используя tokio::join!, все три задачи гарантированно выполняются в одном потоке, но если вы замените его на tokio::spawn и будете использовать многопоточный рантайм, вы сможете запускать несколько блокирующих задач, пока не исчерпаете потоки. Рантайм Tokio по умолчанию создает один поток на ядро CPU, и обычно у вас около 8 ядер CPU. Этого достаточно, чтобы можно было пропустить проблему при локальном тестировании, но достаточно мало, чтобы вы очень быстро исчерпали потоки при реальном запуске кода.
Чтобы дать представление о масштабе, сколько времени — это слишком много, хорошим эмпирическим правилом является не более 10–100 микросекунд между каждым .await. Тем не менее, это зависит от типа приложения, которое вы пишете.
Что, если я хочу блокировать?
Иногда мы просто хотим блокировать поток. Это абсолютно нормально. Есть две распространенные причины для этого:
- Вычислительно сложные (CPU-bound) операции.
- Синхронный ввод-вывод (Synchronous IO).
В обоих случаях мы имеем дело с операцией, которая препятствует достижению задачей .await в течение extended period of time. Чтобы решить эту проблему, мы должны переместить блокирующую операцию в поток вне пула потоков Tokio. Есть три варианта этого:
- Использовать функцию
tokio::task::spawn_blocking. - Использовать крейт
rayon. - Создать выделенный поток с помощью
std::thread::spawn.
Давайте рассмотрим каждое решение, чтобы понять, когда его следует использовать.
Функция spawn_blocking
Рантайм Tokio включает отдельный пул потоков, специально предназначенный для выполнения блокирующих функций, и вы можете порождать задачи в нем с помощью spawn_blocking. Этот пул потоков имеет верхний предел около 500 потоков, так что вы можете породить quite a lot блокирующих операций в этом пуле.
Поскольку в пуле потоков так много потоков, он лучше всего подходит для блокирующего IO, такого как взаимодействие с файловой системой или использование блокирующей библиотеки для работы с базами данных, например diesel.
Пул потоков плохо подходит для ресурсоемких вычислений (CPU-bound computations), поскольку в нем гораздо больше потоков, чем ядер CPU на вашем компьютере. CPU-bound вычисления выполняются наиболее эффективно, если количество потоков равно количеству ядер CPU. Тем не менее, если вам нужно всего несколько CPU-bound вычислений, я не стану винить вас за их запуск через spawn_blocking, так как это довольно просто сделать.
▶︎
#[tokio::main] async fn main() { // Это выполняется в Tokio. Мы не должны блокировать здесь. let blocking_task = tokio::task::spawn_blocking(|| { // Это выполняется в потоке, где блокировка допустима. println!("Inside spawn_blocking"); }); // Мы можем ждать завершения блокирующей задачи вот так: // Если блокирующая задача запаникует, unwrap ниже распространит панику. blocking_task.await.unwrap(); }
Крейт rayon
Крейт rayon — это well known библиотека, которая предоставляет пул потоков, специально предназначенный для ресурсоемких вычислений (CPU-bound computations), и вы можете использовать его для этой цели вместе с Tokio. В отличие от spawn_blocking, пул потоков rayon имеет небольшое максимальное количество потоков, поэтому он подходит для дорогостоящих вычислений.
Мы будем использовать сумму большого списка в качестве примера дорогостоящего вычисления, но обратите внимание, что на практике, если только массив не очень-очень большой, простое вычисление суммы, вероятно, достаточно дешево, чтобы вы могли делать его напрямую в Tokio.
Основная опасность использования rayon заключается в том, что вы должны быть осторожны, чтобы не блокировать поток в ожидании завершения rayon. Чтобы сделать это, объедините rayon::spawn с tokio::sync::oneshot следующим образом:
▶︎
async fn parallel_sum(nums: Vec<i32>) -> i32 { let (send, recv) = tokio::sync::oneshot::channel(); // Запустить задачу в rayon. rayon::spawn(move || { // Выполнить тяжелое вычисление. let mut sum = 0; for num in nums { sum += num; } // Отправить результат обратно в Tokio. let _ = send.send(sum); }); // Ждать завершения задачи в rayon. recv.await.expect("Panic in rayon::spawn") } #[tokio::main] async fn main() { let nums = vec![1; 1024 * 1024]; println!("{}", parallel_sum(nums).await); }
Это использует пул потоков rayon для выполнения дорогой операции. Имейте в виду, что приведенный выше пример использует только один поток в пуле rayon за вызов parallel_sum. Это имеет смысл, если у вас много вызовов parallel_sum в вашем приложении, но также можно использовать параллельные итераторы rayon для вычисления суммы на нескольких потоках:
▶︎
#![allow(unused)] fn main() { use rayon::prelude::*; // Запустить задачу в rayon. rayon::spawn(move || { // Вычислить сумму на нескольких потоках. let sum = nums.par_iter().sum(); // Отправить результат обратно в Tokio. let _ = send.send(sum); }); }
Обратите внимание, что вам все еще нужен вызов rayon::spawn при использовании параллельных итераторов, потому что параллельные итераторы являются блокирующими.
Создание выделенного потока
Если блокирующая операция выполняется вечно (keeps running forever), вам следует запускать ее в выделенном потоке. Например, рассмотрим поток, который управляет подключением к базе данных, используя канал для получения операций с базой данных для выполнения. Поскольку этот поток прослушивает этот канал в цикле, он никогда не завершается.
Запуск такой задачи в любом из двух других пулов потоков является проблемой, потому что это essentially забирает поток из пула навсегда. После того как вы сделаете это несколько раз, в пуле потоков не останется потоков, и все остальные блокирующие задачи не смогут быть выполнены.
Конечно, вы также можете использовать выделенные потоки для кратковременных целей, если вы согласны платить цену за порождение нового потока каждый раз, когда вы запускаете новый.
Резюме
Если вы забыли, вот главное, что вам нужно запомнить:
Асинхронный код никогда не должен проводить много времени, не достигая .await.
Ниже вы найдете шпаргалку по методам, которые вы можете использовать, когда хотите заблокировать:
| Метод | CPU-bound вычисления | Синхронный IO | Выполняется вечно |
|---|---|---|---|
spawn_blocking | Субоптимально | OK | Нет |
rayon | OK | Нет | Нет |
| Выделенный поток | OK | OK | OK |
Наконец, я рекомендую ознакомиться с главой о совместном использовании состояния (shared state) из учебника по Tokio. Эта глава объясняет, как вы можете правильно использовать std::sync::Mutex в асинхронном коде и более подробно рассказывает, почему это допустимо, даже несмотря на то, что блокировка мьютекса является блокирующей операцией. (Спойлер: если вы блокируете ненадолго, это действительно блокировка?)
Я также настоятельно рекомендую статью Reducing tail latencies with automatic cooperative task yielding из блога Tokio.
Спасибо Крису Кричо (Chris Krycho) и Эрике Класен (Erika Clasen) за прочтение черновиков этой статьи и предоставление полезных советов. Все ошибки — мои.