Модуль task
Асинхронные "зеленые потоки" (легковесные задачи).
Что такое задачи?
Задача — это легковесная, неблокирующая единица выполнения. Задача похожа на поток операционной системы, но вместо управления планировщиком ОС, ими управляет среда выполнения Tokio. Другое название этого общего шаблона — зеленые потоки (green threads). Если вы знакомы с горутинами Go, корутинами Kotlin или процессами Erlang, вы можете считать задачи Tokio чем-то подобным.
Ключевые моменты о задачах:
-
Задачи легковесны. Поскольку задачи планируются средой выполнения Tokio, а не операционной системой, создание новых задач или переключение между задачами не требует переключения контекста и имеет довольно низкие накладные расходы. Создание, выполнение и уничтожение большого количества задач достаточно дешево, особенно по сравнению с потоками ОС.
-
Задачи планируются кооперативно. Большинство операционных систем реализуют вытесняющую многозадачность. Это техника планирования, при которой ОС позволяет каждому потоку выполняться в течение периода времени, а затем вытесняет его, временно приостанавливая этот поток и переключаясь на другой. Задачи, напротив, реализуют кооперативную многозадачность. При кооперативной многозадачности задаче разрешено выполняться до тех пор, пока она не уступит (yield), указывая планировщику среды выполнения Tokio, что в данный момент она не может продолжать выполнение. Когда задача уступает, среда выполнения Tokio переключается на выполнение следующей задачи.
-
Задачи неблокирующие. Обычно, когда поток ОС выполняет ввод-вывод или должен синхронизироваться с другим потоком, он блокируется, позволяя ОС запланировать другой поток. Когда задача не может продолжить выполнение, она должна уступить, позволяя среде выполнения Tokio запланировать другую задачу. Задачи, как правило, не должны выполнять системные вызовы или другие операции, которые могут заблокировать поток, поскольку это предотвратит выполнение других задач, работающих в том же потоке. Вместо этого этот модуль предоставляет API для выполнения блокирующих операций в асинхронном контексте.
Работа с задачами
Этот модуль предоставляет следующие API для работы с задачами:
Порождение задач (Spawning)
Возможно, самой важной функцией в этом модуле является task::spawn. Эту функцию можно рассматривать как асинхронный эквивалент thread::spawn из стандартной библиотеки. Она принимает асинхронный блок или другой future и создает новую задачу для конкурентного выполнения этой работы:
#![allow(unused)] fn main() { use tokio::task; task::spawn(async { // выполняем некоторую работу здесь... }); }
Как и std::thread::spawn, task::spawn возвращает структуру JoinHandle. Сам JoinHandle является future, который может быть использован для ожидания результата порожденной задачи. Например:
#![allow(unused)] fn main() { use tokio::task; let join = task::spawn(async { // ... "hello world!" }); // ... // Ожидаем результат порожденной задачи. let result = join.await?; assert_eq!(result, "hello world!"); }
Опять же, как и тип JoinHandle из std::thread, если порожденная задача паникует, ожидание ее JoinHandle вернет JoinError. Например:
#![allow(unused)] fn main() { use tokio::task; let join = task::spawn(async { panic!("something bad happened!") }); // Возвращенный результат указывает, что задача завершилась неудачно. assert!(join.await.is_err()); }
spawn, JoinHandle и JoinError доступны при включенном флаге функции "rt".
Отмена (Cancellation)
Порожденные задачи могут быть отменены с помощью методов JoinHandle::abort или AbortHandle::abort. При вызове одного из этих методов задаче сигнализируют о завершении работы в следующий раз, когда она уступит в точке .await. Если задача уже простаивает, она будет завершена как можно скорее, без повторного запуска перед завершением. Кроме того, завершение работы среды выполнения Tokio (например, возврат из #[tokio::main]) немедленно отменяет все задачи в ней.
При завершении задач выполнение остановится на том .await, на котором она уступила. Все локальные переменные уничтожаются путем запуска их деструкторов. После завершения остановки ожидание JoinHandle завершится ошибкой отмены.
Обратите внимание, что отмена задачи не гарантирует, что она завершится с ошибкой отмены, поскольку она может сначала завершиться нормально. Например, если задача не уступает среде выполнения ни в одной точке между вызовом abort и концом задачи, то JoinHandle вместо этого сообщит, что задача завершилась нормально.
Имейте в виду, что задачи, порожденные с помощью spawn_blocking, не могут быть отменены, потому что они не являются асинхронными. Если вы вызовете abort для задачи spawn_blocking, это не окажет никакого эффекта, и задача продолжит выполняться нормально. Исключение составляет случай, когда задача еще не начала выполняться; в этом случае вызов abort может предотвратить запуск задачи.
Имейте в виду, что вызовы JoinHandle::abort только планируют отмену задачи и возвращаются до завершения отмены. Чтобы дождаться завершения отмены, дождитесь завершения задачи, ожидая JoinHandle. Аналогично, метод JoinHandle::is_finished не возвращает true, пока отмена не завершится.
Многократный вызов JoinHandle::abort имеет тот же эффект, что и однократный вызов.
Tokio также предоставляет AbortHandle, который похож на JoinHandle, за исключением того, что он не предоставляет механизма ожидания завершения задачи. Каждая задача может иметь только один JoinHandle, но она может иметь более одного AbortHandle.
Блокирование и уступка (Blocking and Yielding)
Как мы обсуждали выше, код, выполняющийся в асинхронных задачах, не должен выполнять операции, которые могут блокировать. Блокирующая операция, выполненная в задаче, работающей в потоке, который также выполняет другие задачи, заблокирует весь поток, не позволяя другим задачам выполняться.
Вместо этого Tokio предоставляет два API для выполнения блокирующих операций в асинхронном контексте: task::spawn_blocking и task::block_in_place.
Имейте в виду, что если вы вызываете не-асинхронный метод из асинхронного кода, этот не-асинхронный метод все еще находится внутри асинхронного контекста, поэтому вам также следует избегать там блокирующих операций. Это включает деструкторы объектов, уничтожаемых в асинхронном коде.
spawn_blocking
Функция task::spawn_blocking похожа на функцию task::spawn, обсуждавшуюся в предыдущем разделе, но вместо порождения неблокирующего future в среде выполнения Tokio она порождает блокирующую функцию в выделенном пуле потоков для блокирующих задач. Например:
#![allow(unused)] fn main() { use tokio::task; task::spawn_blocking(|| { // выполняем ресурсоемкую работу или вызываем синхронный код }); }
Так же, как task::spawn, task::spawn_blocking возвращает JoinHandle, который мы можем использовать для ожидания результата блокирующей операции:
#![allow(unused)] fn main() { let join = task::spawn_blocking(|| { // выполняем ресурсоемкую работу или вызываем синхронный код "blocking completed" }); let result = join.await?; assert_eq!(result, "blocking completed"); }
block_in_place
При использовании многопоточной среды выполнения также доступна функция task::block_in_place. Как и task::spawn_blocking, эта функция позволяет запускать блокирующую операцию из асинхронного контекста. Однако, в отличие от spawn_blocking, block_in_place работает путем перевода текущего рабочего потока в блокирующий поток, перемещая другие задачи, выполняющиеся в этом потоке, на другой рабочий поток. Это может повысить производительность, избегая переключений контекста.
Например:
#![allow(unused)] fn main() { use tokio::task; let result = task::block_in_place(|| { // выполняем ресурсоемкую работу или вызываем синхронный код "blocking completed" }); assert_eq!(result, "blocking completed"); }
yield_now
Кроме того, этот модуль предоставляет асинхронную функцию task::yield_now, которая аналогична thread::yield_now из стандартной библиотеки. Вызов и ожидание этой функции заставят текущую задачу уступить планировщику среды выполнения Tokio, позволяя запланировать другие задачи. В конечном итоге уступившая задача будет опрошена снова, что позволит ей выполниться. Например:
#![allow(unused)] fn main() { use tokio::task; async { task::spawn(async { // ... println!("spawned task done!") }); // Уступаем, позволяя сначала выполниться новой задаче. task::yield_now().await; println!("main task done!"); } }
Модули
| Имя | Флаги | Описание |
|---|---|---|
coop | rt | Утилиты для улучшенного кооперативного планирования. |
futures | rt | Future, связанные с задачами. |
join_set | rt | Коллекция задач, порожденных в среде выполнения Tokio. |
Структуры
| Имя | Флаги | Описание |
|---|---|---|
AbortHandle | rt | Владение разрешением на отмену порожденной задачи без ожидания ее завершения. |
Builder | tokio_unstable и tracing | Фабрика, используемая для настройки свойств новой задачи. |
Id | rt | Непрозрачный ID, однозначно идентифицирующий задачу относительно других текущих. |
JoinError | rt | Задача не выполнилась до завершения. |
JoinHandle | rt | Владение разрешением на присоединение к задаче (ожидание ее завершения). |
JoinSet | rt | Коллекция задач, порожденных в среде выполнения Tokio. |
LocalEnterGuard | rt | Контекстная защита для LocalSet. |
LocalKey | rt | Ключ для локальных данных задачи. |
LocalSet | rt | Набор задач, которые выполняются в одном потоке. |
Функции
| Имя | Флаги | Описание |
|---|---|---|
block_in_place | rt-multi-thread | Выполняет предоставленную блокирующую функцию в текущем потоке без блокировки исполнителя. |
id | rt | Возвращает Id текущей выполняемой задачи. |
spawn | rt | Порождает новую асинхронную задачу, возвращая JoinHandle для нее. |
spawn_blocking | rt | Выполняет предоставленное замыкание в потоке, где блокировка допустима. |
spawn_local | rt | Порождает future !Send в текущем LocalSet или LocalRuntime. |
try_id | rt | Возвращает Id текущей выполняемой задачи или None, если вызвано вне задачи. |
yield_now | rt | Уступает выполнение обратно среде выполнения Tokio. |