Крейт tokio
Сайт проекта https://tokio.rs/
Среда выполнения для создания надежных сетевых приложений без компромиссов в скорости.
Tokio — это событийно-ориентированная, неблокирующая платформа ввода-вывода для написания асинхронных приложений на языке Rust. На высоком уровне она предоставляет несколько основных компонентов:
- Инструменты для работы с асинхронными задачами, включая примитивы синхронизации, каналы, таймауты, задержки и интервалы.
- API для выполнения асинхронного ввода-вывода, включая TCP- и UDP-сокеты, операции с файловой системой, а также управление процессами и сигналами.
- Среду выполнения для исполнения асинхронного кода, включая планировщик задач, драйвер ввода-вывода, использующий системные очереди событий (epoll, kqueue, IOCP и т.д.), и высокопроизводительный таймер.
Документация на уровне руководства находится на веб-сайте.
Обзор Tokio
Tokio состоит из ряда модулей, которые предоставляют функциональность, необходимую для реализации асинхронных приложений на Rust. В этом разделе мы кратко рассмотрим Tokio, резюмируя основные API и их назначение.
Самый простой способ начать — включить все функции. Для этого используйте флаг full:
tokio = { version = "1", features = ["full"] }
Создание приложений
Tokio отлично подходит для написания приложений, и большинству пользователей в этом случае не стоит слишком беспокоиться о выборе функций. Если вы не уверены, мы рекомендуем использовать full, чтобы избежать препятствий в процессе разработки.
Пример
Этот пример показывает самый быстрый способ начать работу с Tokio.
tokio = { version = "1", features = ["full"] }
Создание библиотек
Как автор библиотеки, вы должны стремиться предоставить максимально легковесный крейт, основанный на Tokio. Для этого следует включать только те функции, которые вам необходимы. Это позволяет пользователям использовать ваш крейт без необходимости включать лишние функции.
Пример
Этот пример показывает, как можно импортировать функции для библиотеки, которой нужно только tokio::spawn и TcpStream.
tokio = { version = "1", features = ["rt", "net"] }
Работа с задачами
Асинхронные программы в Rust основаны на легковесных, неблокирующих единицах выполнения, называемых задачами. Модуль tokio::task предоставляет важные инструменты для работы с задачами:
- Функцию
spawnи типJoinHandleдля планирования новой задачи в среде выполнения Tokio и ожидания результата выполненной задачи соответственно. - Функции для запуска блокирующих операций в контексте асинхронной задачи.
Модуль tokio::task доступен только при включенном флаге функции "rt".
Модуль tokio::sync содержит примитивы синхронизации для обмена данными. К ним относятся:
- Каналы (
oneshot,mpsc,watchиbroadcast) для отправки значений между задачами. - Неблокирующий
Mutexдля контроля доступа к разделяемому, изменяемому значению. - Асинхронный тип
Barrierдля синхронизации нескольких задач перед началом вычислений.
Модуль tokio::sync доступен только при включенном флаге функции "sync".
Модуль tokio::time предоставляет утилиты для отслеживания времени и планирования работы. Это включает функции для установки таймаутов для задач, откладывания выполнения на будущее или повторения операции через интервалы.
Для использования tokio::time должен быть включен флаг функции "time".
Наконец, Tokio предоставляет среду выполнения для исполнения асинхронных задач. Большинство приложений могут использовать макрос #[tokio::main] для запуска своего кода в среде выполнения Tokio. Однако этот макрос предоставляет только базовые опции конфигурации. В качестве альтернативы, модуль tokio::runtime предоставляет более мощные API для настройки и управления средами выполнения. Вам следует использовать этот модуль, если макрос #[tokio::main] не предоставляет нужной вам функциональности.
Использование среды выполнения требует флагов функций "rt" или "rt-multi-thread" для включения однопоточного планировщика для текущего потока и многопоточного планировщика соответственно. Подробности см. в документации к модулю runtime. Кроме того, флаг функции "macros" включает атрибуты #[tokio::main] и #[tokio::test].
CPU-ограниченные задачи и блокирующий код
Tokio может параллельно выполнять множество задач в нескольких потоках, постоянно переключая выполняемую в данный момент задачу в каждом потоке. Однако такая перестановка может происходить только в точках .await, поэтому код, который долгое время работает без достижения .await, будет блокировать выполнение других задач. Для решения этой проблемы Tokio предоставляет два вида потоков: Основные потоки и Блокирующие потоки.
Основные потоки — это место, где выполняется весь асинхронный код, и по умолчанию Tokio создает по одному такому потоку на каждое ядро CPU. Вы можете использовать переменную окружения TOKIO_WORKER_THREADS для переопределения значения по умолчанию.
Блокирующие потоки создаются по требованию, могут использоваться для запуска блокирующего кода, который в противном случае блокировал бы выполнение других задач, и сохраняются alive в течение некоторого времени бездействия, которое можно настроить с помощью thread_keep_alive. Поскольку Tokio не может вытеснять блокирующие задачи, как это можно делать с асинхронным кодом, верхний предел количества блокирующих потоков очень велик. Эти ограничения можно настроить в Builder.
Для порождения блокирующей задачи следует использовать функцию spawn_blocking.
#[tokio::main] async fn main() { // Этот код выполняется в основном потоке. let blocking_task = tokio::task::spawn_blocking(|| { // Этот код выполняется в блокирующем потоке. // Здесь блокировка допустима. }); // Мы можем ждать завершения блокирующей задачи так: // Если блокирующая задача запаникует, unwrap ниже распространит панику. blocking_task.await.unwrap(); }
Если ваш код ограничен производительностью CPU и вы хотите ограничить количество потоков для его выполнения, вам следует использовать отдельный пул потоков, предназначенный для CPU-ограниченных задач. Например, вы можете рассмотреть возможность использования библиотеки rayon для таких задач. Также можно создать дополнительную среду выполнения Tokio, предназначенную для CPU-ограниченных задач, но в этом случае следует убедиться, что эта среда выполнения выполняет только CPU-ограниченные задачи, так как задачи, ограниченные вводом-выводом, в этой среде будут работать плохо.
Подсказка: При использовании rayon вы можете использовать канал oneshot для отправки результата обратно в Tokio по завершении задачи rayon.
Асинхронный ввод-вывод
Помимо планирования и выполнения задач, Tokio предоставляет все необходимое для асинхронного выполнения ввода и вывода.
Модуль tokio::io предоставляет основные асинхронные примитивы ввода-вывода Tokio: трейты AsyncRead, AsyncWrite и AsyncBufRead. Кроме того, при включенном флаге функции "io-util" он также предоставляет комбинаторы и функции для работы с этими трейтами, образуя асинхронный аналог std::io.
Tokio также включает API для выполнения различных видов ввода-вывода и асинхронного взаимодействия с операционной системой. К ним относятся:
tokio::net— содержит неблокирующие версии TCP, UDP и Unix Domain Sockets (включается флагом"net").tokio::fs— аналогиченstd::fs, но для асинхронного выполнения операций с файловой системой (включается флагом"fs").tokio::signal— для асинхронной обработки сигналов ОС Unix и Windows (включается флагом"signal").tokio::process— для порождения и управления дочерними процессами (включается флагом"process").
Примеры
Простой TCP-эхо сервер:
use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; 1024]; // В цикле читаем данные из сокета и записываем их обратно. loop { let n = match socket.read(&mut buf).await { // сокет закрыт Ok(0) => return, Ok(n) => n, Err(e) => { eprintln!("не удалось прочитать из сокета; err = {:?}", e); return; } }; // Записываем данные обратно if let Err(e) = socket.write_all(&buf[0..n]).await { eprintln!("не удалось записать в сокет; err = {:?}", e); return; } } }); } }
Флаги функций
Tokio использует набор флагов функций для уменьшения количества компилируемого кода. Можно включать только определенные функции. По умолчанию Tokio не включает никакие функции, но позволяет включить их подмножество для конкретного случая использования. Ниже приведен список доступных флагов функций. Вы также можете заметить, что над каждой функцией, структурой и трейтом указан один или несколько флагов функций, необходимых для использования этого элемента. Если вы новичок в Tokio, рекомендуется использовать флаг full, который включает все публичные API. Однако имейте в виду, что это потянет за собой множество дополнительных зависимостей, которые могут вам не понадобиться.
full: Включает все функции, перечисленные ниже, кромеtest-utilиtracing.rt: Включаетtokio::spawn, планировщик для текущего потока и утилиты, не связанные с планировщиком.rt-multi-thread: Включает более тяжелый многопоточный планировщик с work-stealing.io-util: Включает трейты-расширения на основе ввода-вывода.io-std: Включает типыStdout,StdinиStderr.net: Включает типыtokio::net, такие какTcpStream,UnixStreamиUdpSocket, а также (в Unix-подобных системах)AsyncFdи (в FreeBSD)PollAio.time: Включает типыtokio::timeи позволяет планировщикам включить встроенный таймер.process: Включает типыtokio::process.macros: Включает макросы#[tokio::main]и#[tokio::test].sync: Включает все типыtokio::sync.signal: Включает все типыtokio::signal.fs: Включает типыtokio::fs.test-util: Включает инфраструктуру для тестирования среды выполнения Tokio.parking_lot: В качестве потенциальной оптимизации использует примитивы синхронизации из крейтаparking_lotвнутри. Также эта зависимость необходима для создания некоторых наших примитивов в контекстеconst. MSRV может увеличиваться в соответствии с используемой версиейparking_lot.
Примечание: Трейты AsyncRead и AsyncWrite не требуют никаких флагов и всегда доступны.
Нестабильные функции
Некоторые флаги функций доступны только при указании флага tokio_unstable:
tracing: Включает события tracing.
Аналогично, некоторые части API доступны только с тем же флагом:
task::Builder- Некоторые методы в
task::JoinSet runtime::RuntimeMetricsruntime::Builder::on_task_spawnruntime::Builder::on_task_terminateruntime::Builder::unhandled_panicruntime::TaskMeta
Этот флаг включает нестабильные функции. Их публичный API может нарушать семантическое версионирование в релизах 1.x. Чтобы включить эти функции, аргумент --cfg tokio_unstable должен быть передан в rustc при компиляции. Это позволяет явно согласиться на использование функций, которые могут нарушать соглашения semver, поскольку Cargo пока не поддерживает такие соглашения напрямую.
Вы можете указать это в файле .cargo/config.toml вашего проекта:
[build]
rustflags = ["--cfg", "tokio_unstable"]
Раздел [build] указывается не в файле Cargo.toml, а в конфигурационном файле Cargo .cargo/config.toml.
Альтернативно, вы можете указать это с помощью переменной окружения:
# Многие *nix-оболочки:
export RUSTFLAGS="--cfg tokio_unstable"
cargo build
# Windows PowerShell:
$Env:RUSTFLAGS="--cfg tokio_unstable"
cargo build
Поддерживаемые платформы
Tokio в настоящее время гарантированно поддерживает следующие платформы:
- Linux
- Windows
- Android (уровень API 21)
- macOS
- iOS
- FreeBSD
Tokio будет продолжать поддерживать эти платформы в будущем. Однако будущие релизы могут изменить требования, такие как минимальная требуемая версия libc в Linux, уровень API в Android или поддерживаемая версия FreeBSD.
Помимо указанных выше платформ, Tokio предназначен для работы на всех платформах, поддерживаемых крейтом mio. Более длинный список можно найти в документации mio. Однако эти дополнительные платформы могут стать неподдерживаемыми в будущем.
Обратите внимание, что Wine считается платформой, отличной от Windows. Подробнее о поддержке Wine см. в документации mio.
Поддержка WASM
Tokio имеет ограниченную поддержку платформы WASM. Без флага tokio_unstable поддерживаются следующие функции:
syncmacrosio-utilrttime
Включение любой другой функции (включая full) приведет к ошибке компиляции.
Модуль time будет работать только на WASM-платформах, которые поддерживают таймеры (например, wasm32-wasi). Функции времени вызовут панику, если используются на WASM-платформе без поддержки таймеров.
Также обратите внимание, что если среда выполнения становится бесконечно бездействующей, она немедленно запаникует вместо вечной блокировки. На платформах, которые не поддерживают время, это означает, что среда выполнения никогда не может простаивать.
Нестабильная поддержка WASM
Tokio также имеет нестабильную поддержку некоторых дополнительных WASM-функций. Это требует использования флага tokio_unstable.
Использование этого флага позволяет использовать tokio::net для цели wasm32-wasi. Однако не все методы доступны для сетевых типов, поскольку WASI в настоящее время не поддерживает создание новых сокетов изнутри WASM. Из-за этого сокеты в настоящее время должны создаваться с помощью трейта FromRawFd.
Реэкспорты
#![allow(unused)] fn main() { pub use task::spawn; // rt }
Модули
| Имя | Флаги | Описание |
|---|---|---|
fs | fs | Асинхронные файловые утилиты. |
io | Трейты, вспомогательные средства и определения типов для асинхронной функциональности ввода-вывода. | |
net | net | Привязки TCP/UDP/Unix для tokio. |
process | process | Реализация асинхронного управления процессами для Tokio. |
runtime | rt | Среда выполнения Tokio. |
signal | signal | Асинхронная обработка сигналов для Tokio. |
sync | sync | Примитивы синхронизации для использования в асинхронных контекстах. |
task | rt | Асинхронные "зеленые потоки" (задачи). |
time | time | Утилиты для отслеживания времени. |
Макросы
| Имя | Флаги | Описание |
|---|---|---|
join | macros | Ожидает завершения нескольких параллельных ветвей. |
pin | Закрепляет значение в стеке. | |
select | macros | Ожидает завершения первой из нескольких параллельных ветвей, отменяя остальные. |
task_local | rt | Объявляет новый задачно-локальный ключ типа tokio::task::LocalKey. |
try_join | macros | Ожидает успешного завершения всех параллельных ветвей или первой ошибки. |
Атрибут-макросы
| Имя | Флаги | Описание |
|---|---|---|
main | rt и macros | Помечает асинхронную функцию для выполнения выбранной средой выполнения. Помогает настроить Runtime без прямого использования Runtime или Builder. |
test | rt и macros | Помечает асинхронную функцию для выполнения средой выполнения, подходящей для тестов. Помогает настроить Runtime без прямого использования Runtime или Builder. |
Макросы
Макрос join
#![allow(unused)] fn main() { macro_rules! join { ($(biased;)? $($future:expr),*) => { ... }; } }
Доступно только с флагом функции macros.
Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда все ветви завершатся.
Описание
Макрос join! должен использоваться внутри асинхронных функций, замыканий и блоков.
Макрос join! принимает список асинхронных выражений и выполняет их конкурентно в одной и той же задаче. Каждое асинхронное выражение вычисляется в future, и future из каждого выражения мультиплексируются в текущей задаче.
При работе с асинхронными выражениями, возвращающими Result, join! будет ждать завершения всех ветвей независимо от того, завершилась ли какая-либо из них с Err. Используйте try_join! для досрочного возврата при обнаружении Err.
Примечания
Предоставленные future хранятся inline и не требуют выделения Vec.
Характеристики времени выполнения
Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в join!.
Честность (Fairness)
По умолчанию сгенерированный future макроса join! чередует порядок опроса содержащихся future при каждом пробуждении.
Это поведение можно переопределить, добавив biased; в начало использования макроса. Подробности см. в примерах. Это заставит join опрашивать future в порядке их следования сверху вниз.
Это может быть полезно, если ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение.
Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы объединяете поток и future завершения, и поток имеет огромный объем сообщений, обработка которых занимает много времени за один опрос, вам следует поместить future завершения раньше в списке join!, чтобы гарантировать, что он всегда будет опрашиваться и не будет задерживаться из-за того, что future потока долго возвращает Poll::Pending.
Примеры
Базовое объединение с двумя ветвями
#![allow(unused)] fn main() { async fn do_stuff_async() { // асинхронная работа } async fn more_async_work() { // больше асинхронной работы } let (first, second) = tokio::join!( do_stuff_async(), more_async_work() ); // делаем что-то со значениями }
Использование режима biased; для контроля порядка опроса
#![allow(unused)] fn main() { async fn do_stuff_async() { // асинхронная работа } async fn more_async_work() { // больше асинхронной работы } let (first, second) = tokio::join!( biased; do_stuff_async(), more_async_work() ); // делаем что-то со значениями }
Обработка результатов с ошибками
#![allow(unused)] fn main() { async fn fetch_data() -> Result<String, reqwest::Error> { // получение данных } async fn process_file() -> Result<Vec<u8>, std::io::Error> { // обработка файла } // join! будет ждать завершения обеих ветвей, даже если одна вернет Err let (data_result, file_result) = tokio::join!( fetch_data(), process_file() ); // Обрабатываем результаты независимо match data_result { Ok(data) => println!("Данные получены: {}", data), Err(e) => println!("Ошибка получения данных: {}", e), } match file_result { Ok(file) => println!("Файл обработан, размер: {} байт", file.len()), Err(e) => println!("Ошибка обработки файла: {}", e), } }
Использование с разными типами возвращаемых значений
#![allow(unused)] fn main() { async fn get_number() -> i32 { 42 } async fn get_text() -> &'static str { "hello" } async fn get_vector() -> Vec<u8> { vec![1, 2, 3] } let (number, text, vector) = tokio::join!( get_number(), get_text(), get_vector() ); println!("Number: {}, Text: {}, Vector: {:?}", number, text, vector); }
Макрос select
#![allow(unused)] fn main() { macro_rules! select { { $( biased; )? $( $bind:pat = $fut:expr $(, if $cond:expr)? => $handler:expr, )* $( else => $els:expr $(,)? )? } => { ... }; } }
Доступно только с флагом функции macros.
Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда первая ветвь завершается, отменяя оставшиеся ветви.
Описание
Макрос select! должен использоваться внутри асинхронных функций, замыканий и блоков.
Макрос select! принимает одну или несколько ветвей со следующим шаблоном:
<шаблон> = <асинхронное выражение> (, if <предусловие>)? => <обработчик>,
Дополнительно макрос select! может включать единственную необязательную ветвь else, которая выполняется, если ни одна из других ветвей не соответствует своим шаблонам:
else => <выражение>
Макрос агрегирует все выражения <асинхронное выражение> и выполняет их конкурентно в текущей задаче. Как только первое выражение завершается со значением, соответствующим его <шаблону>, макрос select! возвращает результат вычисления выражения <обработчик> завершенной ветви.
Кроме того, каждая ветвь может включать необязательное предусловие if. Если предусловие возвращает false, то ветвь отключается. Предоставленное <асинхронное выражение> все равно вычисляется, но результирующий future никогда не опрашивается. Эта возможность полезна при использовании select! в цикле.
Полный жизненный цикл выражения select! выглядит следующим образом:
- Вычислить все предоставленные выражения
<предусловие>. Если предусловие возвращаетfalse, отключить ветвь до конца текущего вызоваselect!. Повторный вход вselect!из-за цикла сбрасывает состояние "отключено". - Агрегировать
<асинхронное выражение>из каждой ветви, включая отключенные. Если ветвь отключена,<асинхронное выражение>все равно вычисляется, но результирующий future не опрашивается. - Если все ветви отключены: перейти к шагу 6.
- Конкурентно ожидать результаты всех оставшихся
<асинхронное выражение>. - Как только
<асинхронное выражение>возвращает значение, попытаться применить значение к предоставленному<шаблону>. Если шаблон совпадает, вычислить<обработчик>и вернуть результат. Если шаблон не совпадает, отключить текущую ветвь до конца текущего вызоваselect!. Продолжить с шага 3. - Вычислить выражение
else. Если выражениеelseне предоставлено, вызвать панику.
Характеристики времени выполнения
Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в select!.
Честность (Fairness)
По умолчанию select! случайным образом выбирает ветвь для проверки первой. Это обеспечивает некоторый уровень честности при вызове select! в цикле с ветвями, которые всегда готовы.
Это поведение можно переопределить, добавив biased; в начало использования макроса. Это заставит select опрашивать future в порядке их следования сверху вниз. Есть несколько причин, по которым это может понадобиться:
- Генерация случайных чисел в
tokio::select!имеет ненулевую стоимость CPU - Ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение
Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы выбираете между потоком и future завершения, и поток имеет огромный объем сообщений и нулевое или почти нулевое время между ними, вам следует поместить future завершения раньше в списке select!, чтобы гарантировать, что он всегда будет опрашиваться и не будет проигнорирован из-за того, что поток постоянно готов.
Паника
Макрос select! вызывает панику, если все ветви отключены и нет предоставленной ветви else. Ветвь отключается, когда предоставленное предусловие if возвращает false или когда шаблон не соответствует результату <асинхронное выражение>.
Безопасность отмены (Cancellation Safety)
При использовании select! в цикле для получения сообщений из нескольких источников следует убедиться, что вызов получения безопасен при отмене, чтобы избежать потери сообщений. В этом разделе рассматриваются различные распространенные методы и описывается, являются ли они безопасными при отмене. Списки в этом разделе не являются исчерпывающими.
Следующие методы безопасны при отмене:
tokio::sync::mpsc::Receiver::recvtokio::sync::mpsc::UnboundedReceiver::recvtokio::sync::broadcast::Receiver::recvtokio::sync::watch::Receiver::changedtokio::net::TcpListener::accepttokio::net::UnixListener::accepttokio::signal::unix::Signal::recvtokio::io::AsyncReadExt::readна любомAsyncReadtokio::io::AsyncReadExt::read_bufна любомAsyncReadtokio::io::AsyncWriteExt::writeна любомAsyncWritetokio::io::AsyncWriteExt::write_bufна любомAsyncWritetokio_stream::StreamExt::nextна любомStreamfutures::stream::StreamExt::nextна любомStream
Следующие методы не безопасны при отмене и могут привести к потере данных:
tokio::io::AsyncReadExt::read_exacttokio::io::AsyncReadExt::read_to_endtokio::io::AsyncReadExt::read_to_stringtokio::io::AsyncWriteExt::write_all
Следующие методы не безопасны при отмене, потому что они используют очередь для честности, и отмена заставляет вас потерять свое место в очереди:
tokio::sync::Mutex::locktokio::sync::RwLock::readtokio::sync::RwLock::writetokio::sync::Semaphore::acquiretokio::sync::Notify::notified
Чтобы определить, являются ли ваши собственные методы безопасными при отмене, ищите места использования .await. Это связано с тем, что когда асинхронный метод отменяется, это всегда происходит в .await. Если ваша функция ведет себя корректно, даже если она перезапускается во время ожидания в .await, то она безопасна при отмене.
Безопасность отмены можно определить следующим образом: если у вас есть future, который еще не завершился, то удаление этого future и его повторное создание должно быть no-op операцией. Это определение мотивировано ситуацией, когда select! используется в цикле. Без этой гарантии вы потеряете свой прогресс, когда другая ветвь завершится и вы перезапустите select!, пройдя по циклу.
Имейте в виду, что отмена чего-либо, что не является безопасным при отмене, не обязательно является ошибкой. Например, если вы отменяете задачу, потому что приложение завершает работу, то вам, вероятно, все равно, что частично прочитанные данные будут потеряны.
Примеры
Базовый select с двумя ветвями
#![allow(unused)] fn main() { async fn do_stuff_async() { // асинхронная работа } async fn more_async_work() { // больше асинхронной работы } tokio::select! { _ = do_stuff_async() => { println!("do_stuff_async() завершился первым") } _ = more_async_work() => { println!("more_async_work() завершился первым") } }; }
Базовый выбор между потоками
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; let mut stream1 = stream::iter(vec![1, 2, 3]); let mut stream2 = stream::iter(vec![4, 5, 6]); let next = tokio::select! { v = stream1.next() => v.unwrap(), v = stream2.next() => v.unwrap(), }; assert!(next == 1 || next == 4); }
Сбор содержимого двух потоков
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; let mut stream1 = stream::iter(vec![1, 2, 3]); let mut stream2 = stream::iter(vec![4, 5, 6]); let mut values = vec![]; loop { tokio::select! { Some(v) = stream1.next() => values.push(v), Some(v) = stream2.next() => values.push(v), else => break, } } values.sort(); assert_eq!(&[1, 2, 3, 4, 5, 6], &values[..]); }
Использование с таймаутом
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; use tokio::time::{self, Duration}; let mut stream = stream::iter(vec![1, 2, 3]); let sleep = time::sleep(Duration::from_secs(1)); tokio::pin!(sleep); loop { tokio::select! { maybe_v = stream.next() => { if let Some(v) = maybe_v { println!("получено = {}", v); } else { break; } } _ = &mut sleep => { println!("таймаут"); break; } } } }
Объединение двух значений с использованием select!
#![allow(unused)] fn main() { use tokio::sync::oneshot; let (tx1, mut rx1) = oneshot::channel(); let (tx2, mut rx2) = oneshot::channel(); tokio::spawn(async move { tx1.send("first").unwrap(); }); tokio::spawn(async move { tx2.send("second").unwrap(); }); let mut a = None; let mut b = None; while a.is_none() || b.is_none() { tokio::select! { v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()), v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()), } } let res = (a.unwrap(), b.unwrap()); assert_eq!(res.0, "first"); assert_eq!(res.1, "second"); }
Использование режима biased; для контроля порядка опроса
#![allow(unused)] fn main() { let mut count = 0u8; loop { tokio::select! { // Если запустить этот пример без `biased;`, порядок опроса // псевдослучайный, и проверки значения count (вероятно) провалятся. biased; _ = async {}, if count < 1 => { count += 1; assert_eq!(count, 1); } _ = async {}, if count < 2 => { count += 1; assert_eq!(count, 2); } _ = async {}, if count < 3 => { count += 1; assert_eq!(count, 3); } _ = async {}, if count < 4 => { count += 1; assert_eq!(count, 4); } else => { break; } }; } }
Избегание гонок в предусловиях if
Поскольку предусловия if используются для отключения ветвей select!, следует проявлять осторожность, чтобы избежать пропуска значений.
Например, вот неправильное использование sleep с if. Цель - повторно выполнять асинхронную задачу до 50 миллисекунд. Однако существует вероятность пропуска завершения сна.
ⓘ
#![allow(unused)] fn main() { use tokio::time::{self, Duration}; async fn some_async_work() { // выполняем работу } let sleep = time::sleep(Duration::from_millis(50)); tokio::pin!(sleep); while !sleep.is_elapsed() { tokio::select! { _ = &mut sleep, if !sleep.is_elapsed() => { println!("операция превысила время ожидания"); } _ = some_async_work() => { println!("операция завершена"); } } } }
В приведенном выше примере sleep.is_elapsed() может возвращать true, даже если sleep.poll() никогда не возвращал Ready. Это создает потенциальное состояние гонки, когда sleep истекает между проверкой while !sleep.is_elapsed() и вызовом select!, в результате чего вызов some_async_work() выполняется беспрепятственно, несмотря на истечение времени сна.
Один из способов написать приведенный выше пример без гонки:
#![allow(unused)] fn main() { use tokio::time::{self, Duration}; async fn some_async_work() { // выполняем работу } let sleep = time::sleep(Duration::from_millis(50)); tokio::pin!(sleep); loop { tokio::select! { _ = &mut sleep => { println!("операция превысила время ожидания"); break; } _ = some_async_work() => { println!("операция завершена"); } } } }
Альтернативы из экосистемы
Макрос select! - это мощный инструмент для управления несколькими асинхронными ветвями, позволяющий задачам выполняться конкурентно в одном потоке. Однако его использование может создавать сложности, особенно связанные с безопасностью отмены, что может привести к трудноуловимым и сложным для отладки ошибкам. Для многих случаев использования альтернативы из экосистемы могут быть предпочтительнее, поскольку они смягчают эти проблемы, предлагая более четкий синтаксис, более предсказуемое управление потоком и уменьшая необходимость ручного решения таких проблем, как семантика предохранителей или безопасность отмены.
Объединение потоков (Merging Streams)
Для случаев, когда loop { select! { ... } } используется для опроса нескольких задач, объединение потоков предлагает краткую альтернативу, которая по своей природе обрабатывает безопасную при отмене обработку, устраняя риск потери данных. Библиотеки, такие как tokio_stream, futures::stream и futures_concurrency, предоставляют инструменты для объединения потоков и последовательной обработки их выходных данных.
Гонка Future (Racing Futures)
Если вам нужно дождаться первого завершения среди нескольких асинхронных задач, утилиты экосистемы, такие как futures, futures-lite или futures-concurrency, предоставляют упрощенный синтаксис для гонки future:
futures_concurrency::future::Racefutures::selectfutures::stream::select_all(для потоков)futures_lite::future::orfutures_lite::future::race
#![allow(unused)] fn main() { use futures_concurrency::future::Race; let task_a = async { Ok("ok") }; let task_b = async { Err("error") }; let result = (task_a, task_b).race().await; match result { Ok(output) => println!("Первая задача завершилась с: {output}"), Err(err) => eprintln!("Произошла ошибка: {err}"), } }
Macro task_local
#![allow(unused)] fn main() { macro_rules! task_local { () => { ... }; ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { ... }; ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { ... }; } }
Доступно только в crate feature rt.
Объявляет новый локальный ключ типа tokio::task::LocalKey
Синтакс
Макрос оборачивает любое количество статических объявлений и делает их локальными для текущей задачи. Публичность и атрибуты для каждого выражения сохраняются. Например:
Пример
#![allow(unused)] fn main() { task_local! { pub static ONE: u32; #[allow(unused)] static TWO: f32; } }
См. LocalKey документацию для большей информации.
Макрос pin
#![allow(unused)] fn main() { macro_rules! pin { ($($x:ident),*) => { ... }; ($( let $x:ident = $init:expr; )*) => { ... }; } }
Закрепляет значение в стеке.
Описание
Вызовы async fn возвращают анонимные значения Future, которые являются !Unpin. Эти значения должны быть закреплены до того, как их можно будет опросить. Вызов .await обработает это, но потребляет future. Если требуется вызвать .await на ссылке &mut _, вызывающая сторона отвечает за закрепление future.
Закрепление может быть выполнено путем выделения памяти с помощью Box::pin или использования стека с помощью макроса pin!.
Следующий код не скомпилируется: ⓘ
async fn my_async_fn() { // асинхронная логика здесь } #[tokio::main] async fn main() { let mut future = my_async_fn(); (&mut future).await; // Ошибка компиляции! }
Чтобы это работало, требуется закрепление:
#![allow(unused)] fn main() { use tokio::pin; async fn my_async_fn() { // асинхронная логика здесь } let future = my_async_fn(); pin!(future); (&mut future).await; // Теперь работает }
Закрепление полезно при использовании select! и операторов потоков, которые требуют T: Stream + Unpin.
Использование
Макрос pin! принимает идентификаторы в качестве аргументов. Он не работает с выражениями.
Следующее не компилируется, так как выражение передается в pin!:
ⓘ
async fn my_async_fn() { // асинхронная логика здесь } #[tokio::main] async fn main() { let mut future = pin!(my_async_fn()); // Ошибка: ожидается идентификатор (&mut future).await; }
Примеры
Использование с select!
#![allow(unused)] fn main() { use tokio::{pin, select}; use tokio_stream::{self as stream, StreamExt}; async fn my_async_fn() { // асинхронная логика здесь } let mut stream = stream::iter(vec![1, 2, 3, 4]); let future = my_async_fn(); pin!(future); loop { select! { _ = &mut future => { // Прекращаем цикл после завершения future break; } Some(val) = stream.next() => { println!("получено значение = {}", val); } } } }
Одновременное объявление переменной и закрепление
Поскольку присваивание переменной с последующим закреплением является распространенной операцией, существует вариант макроса, который поддерживает выполнение обоих действий за один раз.
#![allow(unused)] fn main() { use tokio::{pin, select}; async fn my_async_fn() { // асинхронная логика здесь } pin! { let future1 = my_async_fn(); let future2 = my_async_fn(); } select! { _ = &mut future1 => {} _ = &mut future2 => {} } }
Закрепление нескольких значений
#![allow(unused)] fn main() { use tokio::pin; async fn task_one() -> i32 { 42 } async fn task_two() -> &'static str { "hello" } let fut1 = task_one(); let fut2 = task_two(); pin!(fut1, fut2); // Теперь можно использовать &mut fut1 и &mut fut2 с .await }
Практический пример с обработкой ошибок
use tokio::{pin, select}; use std::time::Duration; async fn network_request() -> Result<String, &'static str> { tokio::time::sleep(Duration::from_secs(1)).await; Ok("данные получены".to_string()) } async fn timeout() -> Result<(), &'static str> { tokio::time::sleep(Duration::from_secs(2)).await; Err("таймаут") } #[tokio::main] async fn main() { let request_future = network_request(); let timeout_future = timeout(); pin! { let request = request_future; let timeout = timeout_future; } select! { result = &mut request => { match result { Ok(data) => println!("Успех: {}", data), Err(e) => println!("Ошибка запроса: {}", e), } } _ = &mut timeout => { println!("Превышено время ожидания запроса"); } } }
Примечания
- Закрепление в стеке более эффективно, чем использование
Box::pin, так как избегает выделения памяти в куче - Закрепленные значения могут быть безопасно разыменованы для получения
&mut Tтолько через специальные интерфейсы - Макрос
pin!гарантирует, что закрепленное значение не может быть перемещено после закрепления
Макрос try_join
#![allow(unused)] fn main() { macro_rules! try_join { ($(biased;)? $($future:expr),*) => { ... }; } }
Доступно только с флагом функции macros.
Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда все ветви завершаются с Ok(_) или при первой Err(_).
Описание
Макрос try_join! должен использоваться внутри асинхронных функций, замыканий и блоков.
Аналогично join!, макрос try_join! принимает список асинхронных выражений и выполняет их конкурентно в одной и той же задаче. Каждое асинхронное выражение вычисляется в future, и future из каждого выражения мультиплексируются в текущей задаче. Макрос try_join! возвращает результат, когда все ветви возвращают Ok или когда первая ветвь возвращает Err.
Примечания
Предоставленные future хранятся inline и не требуют выделения Vec.
Характеристики времени выполнения
Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в try_join!.
Честность (Fairness)
По умолчанию сгенерированный future макроса try_join! чередует порядок опроса содержащихся future при каждом пробуждении.
Это поведение можно переопределить, добавив biased; в начало использования макроса. Это заставит try_join опрашивать future в порядке их следования сверху вниз.
Это может быть полезно, если ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение.
Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы объединяете поток и future завершения, и поток имеет огромный объем сообщений, обработка которых занимает много времени за один опрос, вам следует поместить future завершения раньше в списке try_join!, чтобы гарантировать, что он всегда будет опрашиваться и не будет задерживаться из-за того, что future потока долго возвращает Poll::Pending.
Примеры
Базовый try_join с двумя ветвями
#![allow(unused)] fn main() { async fn do_stuff_async() -> Result<(), &'static str> { // асинхронная работа Ok(()) } async fn more_async_work() -> Result<(), &'static str> { // больше асинхронной работы Ok(()) } let res = tokio::try_join!( do_stuff_async(), more_async_work() ); match res { Ok((first, second)) => { // делаем что-то со значениями println!("Обе операции завершились успешно"); } Err(err) => { println!("Обработка завершилась ошибкой; ошибка = {}", err); } } }
Использование try_join! с порожденными задачами
#![allow(unused)] fn main() { use tokio::task::JoinHandle; async fn do_stuff_async() -> Result<String, &'static str> { // асинхронная работа Ok("результат".to_string()) } async fn more_async_work() -> Result<i32, &'static str> { // больше асинхронной работы Ok(42) } async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => Err(err), Err(err) => Err("обработка задачи завершилась ошибкой"), } } let handle1 = tokio::spawn(do_stuff_async()); let handle2 = tokio::spawn(more_async_work()); match tokio::try_join!(flatten(handle1), flatten(handle2)) { Ok((text, number)) => { println!("Успех: текст = {}, число = {}", text, number); } Err(err) => { println!("Не удалось выполнить с ошибкой: {}.", err); } } }
Использование режима biased; для контроля порядка опроса
#![allow(unused)] fn main() { async fn do_stuff_async() -> Result<(), &'static str> { // асинхронная работа Ok(()) } async fn more_async_work() -> Result<(), &'static str> { // больше асинхронной работы Ok(()) } let res = tokio::try_join!( biased; do_stuff_async(), more_async_work() ); match res { Ok((first, second)) => { // делаем что-то со значениями } Err(err) => { println!("обработка завершилась ошибкой; ошибка = {}", err); } } }
Практический пример с различными типами результатов
#![allow(unused)] fn main() { async fn fetch_user() -> Result<String, &'static str> { // имитация запроса к API tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; Ok("Иван Иванов".to_string()) } async fn fetch_balance() -> Result<f64, &'static str> { // имитация запроса к базе данных tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; Ok(1000.50) } async fn validate_session() -> Result<bool, &'static str> { // проверка сессии Ok(true) } let result = tokio::try_join!( fetch_user(), fetch_balance(), validate_session() ); match result { Ok((user, balance, is_valid)) => { println!("Пользователь: {}, Баланс: {:.2}, Сессия действительна: {}", user, balance, is_valid); } Err(err) => { eprintln!("Ошибка при загрузке данных: {}", err); } } }
Обработка ошибок с досрочным завершением
#![allow(unused)] fn main() { async fn process_step1() -> Result<String, &'static str> { // шаг 1 обработки Ok("шаг1 завершен".to_string()) } async fn process_step2() -> Result<i32, &'static str> { // шаг 2 обработки - имитация ошибки Err("ошибка на шаге 2") } async fn process_step3() -> Result<bool, &'static str> { // этот шаг не будет выполнен из-за ошибки в step2 Ok(true) } let result = tokio::try_join!( process_step1(), process_step2(), process_step3() ); match result { Ok(_) => { println!("Все шаги завершены успешно"); } Err(err) => { println!("Обработка прервана на шаге с ошибкой: {}", err); // step3 не будет выполнен из-за досрочного завершения } } }
Использование с разными типами ошибок
#![allow(unused)] fn main() { use std::convert::From; // Функция для приведения разных типов ошибок к общему типу fn map_error<E: ToString>(err: E) -> String { err.to_string() } async fn api_call() -> Result<String, reqwest::Error> { // вызов API Ok("данные API".to_string()) } async fn db_query() -> Result<i32, sqlx::Error> { // запрос к базе данных Ok(42) } let result = tokio::try_join!( api_call().map_err(map_error), db_query().map_err(map_error) ); match result { Ok((api_data, db_data)) => { println!("API данные: {}, DB данные: {}", api_data, db_data); } Err(err) => { eprintln!("Ошибка: {}", err); } } }
Модули
Модуль doc
Доступно только на docs.rs и Unix системах.
Типы, которые документированы локально в крейте Tokio, но фактически не находятся здесь.
Обратите внимание, что этот модуль виден только на docs.rs, вы не можете использовать его напрямую в своем коде.
Модули
| Имя | Флаги | Описание |
|---|---|---|
os | net или fs | См. std::os. |
Перечисления
| Имя | Описание |
|---|---|
NotDefinedHere | Имя типа, который не определен здесь. |
Примечание по использованию
Этот модуль существует исключительно для целей документации и не предназначен для прямого использования в коде. Он служит для организации документации на сайте docs.rs и предоставления перекрестных ссылок между различными компонентами Tokio.
Фактические реализации типов, упомянутых в этом модуле, находятся в других местах крейта Tokio или в стандартной библиотеке Rust.
Модуль fs
Доступно только с флагом функции fs.
Асинхронные файловые утилиты.
Этот модуль содержит служебные методы для работы с файловой системой асинхронно. Это включает чтение/запись файлов и работу с каталогами.
Важное замечание
Имейте в виду, что большинство операционных систем не предоставляют асинхронные API файловой системы. Из-за этого Tokio будет использовать обычные блокирующие файловые операции "за кулисами". Это делается с использованием пула потоков spawn_blocking для их выполнения в фоновом режиме.
Модуль tokio::fs следует использовать только для обычных файлов. Попытка использовать его с, например, именованным каналом в Linux может привести к неожиданному поведению, такому как зависания при завершении работы среды выполнения. Для специальных файлов следует использовать специальные типы, такие как tokio::net::unix::pipe или AsyncFd.
В настоящее время Tokio всегда использует spawn_blocking на всех платформах, но в будущем это может быть изменено на использование асинхронных API файловой системы, таких как io_uring.
Использование
Самый простой способ использовать этот модуль — использовать служебные функции, которые работают с целыми файлами:
tokio::fs::readtokio::fs::read_to_stringtokio::fs::write
Две функции чтения считывают весь файл и возвращают его содержимое. Функция записи принимает содержимое файла и записывает это содержимое в файл. Она перезаписывает существующий файл, если он есть.
Например, чтобы прочитать файл:
#![allow(unused)] fn main() { let contents = tokio::fs::read_to_string("my_file.txt").await?; println!("Файл содержит {} строк.", contents.lines().count()); }
Чтобы перезаписать файл:
#![allow(unused)] fn main() { let contents = "Первая строка.\nВторая строка.\nТретья строка.\n"; tokio::fs::write("my_file.txt", contents.as_bytes()).await?; }
Использование File
Основной тип для взаимодействия с файлами — File. Он может использоваться для чтения и записи данного файла. Это делается с использованием трейтов AsyncRead и AsyncWrite. Этот тип обычно используется, когда вы хотите сделать что-то более сложное, чем просто прочитать или записать все содержимое за один раз.
Примечание: Важно использовать flush при записи в файл Tokio. Это связано с тем, что вызовы write возвращаются до завершения записи, а flush будет ждать завершения записи. (Запись произойдет, даже если вы не вызываете flush; это просто произойдет позже.) Это отличается от std::fs::File и связано с тем, что File использует spawn_blocking за кулисами.
Например, чтобы подсчитать количество строк в файле без загрузки всего файла в память:
#![allow(unused)] fn main() { use tokio::fs::File; use tokio::io::AsyncReadExt; let mut file = File::open("my_file.txt").await?; let mut chunk = vec![0; 4096]; let mut number_of_lines = 0; loop { let len = file.read(&mut chunk).await?; if len == 0 { // Длина ноль означает конец файла. break; } for &b in &chunk[..len] { if b == b'\n' { number_of_lines += 1; } } } println!("Файл содержит {} строк.", number_of_lines); }
Например, чтобы записать файл построчно:
#![allow(unused)] fn main() { use tokio::fs::File; use tokio::io::AsyncWriteExt; let mut file = File::create("my_file.txt").await?; file.write_all(b"Первая строка.\n").await?; file.write_all(b"Вторая строка.\n").await?; file.write_all(b"Третья строка.\n").await?; // Не забудьте вызвать `flush` после записи! file.flush().await?; }
Настройка производительности файлового ввода-вывода
Файлы Tokio используют spawn_blocking за кулисами, и это имеет серьезные последствия для производительности. Чтобы добиться хорошей производительности при файловом вводе-выводе в Tokio, рекомендуется объединять ваши операции в как можно меньше вызовов spawn_blocking.
Один пример этого различия можно увидеть, сравнив два приведенных выше примера чтения. Первый пример использует tokio::fs::read, который считывает весь файл за один вызов spawn_blocking, а затем возвращает его. Второй пример будет читать файл частями, используя множество вызовов spawn_blocking. Это означает, что второй пример, скорее всего, будет более затратным для больших файлов. (Конечно, использование частей может быть необходимо для очень больших файлов, которые не помещаются в памяти.)
Следующие примеры покажут некоторые стратегии для этого:
Запись всего содержимого за один раз
При создании файла записывайте данные в String или Vec<u8>, а затем записывайте весь файл одним вызовом spawn_blocking с помощью tokio::fs::write.
#![allow(unused)] fn main() { let mut contents = String::new(); contents.push_str("Первая строка.\n"); contents.push_str("Вторая строка.\n"); contents.push_str("Третья строка.\n"); tokio::fs::write("my_file.txt", contents.as_bytes()).await?; }
Использование буферизации
Используйте BufReader и BufWriter для буферизации множества мелких операций чтения или записи в несколько крупных. Этот пример, скорее всего, выполнит только один вызов spawn_blocking.
#![allow(unused)] fn main() { use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; let mut file = BufWriter::new(File::create("my_file.txt").await?); file.write_all(b"Первая строка.\n").await?; file.write_all(b"Вторая строка.\n").await?; file.write_all(b"Третья строка.\n").await?; // Из-за BufWriter фактическая запись и вызов spawn_blocking // происходят при вызове flush. file.flush().await?; }
Ручное использование std::fs внутри spawn_blocking
#![allow(unused)] fn main() { use std::fs::File; use std::io::{self, Write}; use tokio::task::spawn_blocking; spawn_blocking(move || { let mut file = File::create("my_file.txt")?; file.write_all(b"Первая строка.\n")?; file.write_all(b"Вторая строка.\n")?; file.write_all(b"Третья строка.\n")?; // В отличие от файла Tokio, файлу std::fs // не нужен flush. io::Result::Ok(()) }).await.unwrap()?; }
Также полезно знать о File::set_max_buf_size, который управляет максимальным количеством байт, которое файл Tokio будет читать или записывать за один вызов spawn_blocking. По умолчанию это два мегабайта, но это может измениться.
Структуры
| Имя | Описание |
|---|---|
DirBuilder | Построитель для создания каталогов различными способами. |
DirEntry | Записи, возвращаемые потоком ReadDir. |
File | Ссылка на открытый файл в файловой системе. |
OpenOptions | Опции и флаги, которые можно использовать для настройки открытия файла. |
ReadDir | Чтение записей в каталоге. |
Функции
| Имя | Описание |
|---|---|
canonicalize | Возвращает каноническую, абсолютную форму пути с нормализованными промежуточными компонентами и разрешенными символическими ссылками. |
copy | Копирует содержимое одного файла в другой. Также копирует биты разрешений исходного файла в файл назначения. Перезаписывает содержимое файла назначения. |
create_dir | Создает новый пустой каталог по указанному пути. |
create_dir_all | Рекурсивно создает каталог и все его родительские компоненты, если они отсутствуют. |
hard_link | Создает новую жесткую ссылку в файловой системе. |
metadata | Получает информацию о файле, каталоге и т.д. по указанному пути. |
read | Читает все содержимое файла в вектор байтов. |
read_dir | Возвращает поток записей в каталоге. |
read_link | Читает символическую ссылку, возвращая файл, на который она указывает. |
read_to_string | Создает future, который откроет файл для чтения и прочитает все содержимое в строку. |
remove_dir | Удаляет существующий пустой каталог. |
remove_dir_all | Удаляет каталог по этому пути, предварительно удалив все его содержимое. Используйте осторожно! |
remove_file | Удаляет файл из файловой системы. |
rename | Переименовывает файл или каталог в новое имя, заменяя исходный файл, если to уже существует. |
set_permissions | Изменяет разрешения файла или каталога. |
symlink | Unix: Создает новую символическую ссылку в файловой системе. |
symlink_dir | Windows: Создает новую символическую ссылку на каталог. |
symlink_file | Windows: Создает новую символическую ссылку на файл. |
symlink_metadata | Получает метаданные файловой системы для пути. |
try_exists | Возвращает Ok(true), если путь указывает на существующую сущность. |
write | Создает future, который откроет файл для записи и запишет все содержимое contents в него. |
Модуль io
Трейты, вспомогательные средства и определения типов для асинхронной функциональности ввода-вывода.
Этот модуль является асинхронной версией std::io. В основном он определяет два трейта: AsyncRead и AsyncWrite, которые являются асинхронными версиями трейтов Read и Write из стандартной библиотеки.
AsyncRead и AsyncWrite
Как и трейты Read и Write стандартной библиотеки, AsyncRead и AsyncWrite предоставляют наиболее общий интерфейс для чтения и записи ввода и вывода. Однако, в отличие от трейтов стандартной библиотеки, они асинхронны — это означает, что чтение из или запись в тип tokio::io будет уступать планировщику Tokio, когда ввод-вывод не готов, вместо блокировки. Это позволяет другим задачам выполняться во время ожидания ввода-вывода.
Другое отличие заключается в том, что AsyncRead и AsyncWrite содержат только основные методы, необходимые для предоставления асинхронной функциональности чтения и записи. Вместо этого служебные методы определены в трейтах-расширениях AsyncReadExt и AsyncWriteExt. Эти трейты автоматически реализуются для всех значений, которые реализуют AsyncRead и AsyncWrite соответственно.
Конечные пользователи редко будут взаимодействовать напрямую с AsyncRead и AsyncWrite. Вместо этого они будут использовать асинхронные функции, определенные в трейтах-расширениях. Ожидается, что авторы библиотек будут реализовывать AsyncRead и AsyncWrite, чтобы предоставлять типы, которые ведут себя как потоки байтов.
Даже с этими различиями, трейты AsyncRead и AsyncWrite Tokio можно использовать почти точно так же, как Read и Write стандартной библиотеки. Большинство типов в стандартной библиотеке, которые реализуют Read и Write, имеют асинхронные эквиваленты в tokio, которые реализуют AsyncRead и AsyncWrite, такие как File и TcpStream.
Например, документация стандартной библиотеки представляет Read, демонстрируя чтение некоторых байтов из std::fs::File. Мы можем сделать то же самое с tokio::fs::File:
use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = [0; 10]; // читаем до 10 байт let n = f.read(&mut buffer).await?; println!("Байты: {:?}", &buffer[..n]); Ok(()) }
Буферизированные читатели и писатели
Интерфейсы на основе байтов неудобны и могут быть неэффективными, так как нам потребуется делать почти постоянные вызовы к операционной системе. Чтобы помочь с этим, std::io поставляется с поддержкой буферизированных читателей и писателей, и, следовательно, tokio::io тоже.
Tokio предоставляет асинхронную версию трейта std::io::BufRead — AsyncBufRead; а также асинхронные структуры BufReader и BufWriter, которые оборачивают читателей и писателей. Эти обертки используют буфер, уменьшая количество вызовов и предоставляя более удобные методы для доступа именно к тому, что вам нужно.
Например, BufReader работает с трейтом AsyncBufRead, чтобы добавить дополнительные методы к любому асинхронному читателю:
use tokio::io::{self, BufReader, AsyncBufReadExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let f = File::open("foo.txt").await?; let mut reader = BufReader::new(f); let mut buffer = String::new(); // читаем строку в buffer reader.read_line(&mut buffer).await?; println!("{}", buffer); Ok(()) }
BufWriter не добавляет новых способов записи; он просто буферизует каждый вызов write. Однако вы должны вызывать flush для BufWriter, чтобы убедиться, что все буферизированные данные записаны.
use tokio::io::{self, BufWriter, AsyncWriteExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let f = File::create("foo.txt").await?; { let mut writer = BufWriter::new(f); // Записываем байт в буфер. writer.write(&[42u8]).await?; // Сбрасываем буфер перед выходом из области видимости. writer.flush().await?; } // Если не сброшено или не закрыто, содержимое буфера отбрасывается при удалении. Ok(()) }
Реализация AsyncRead и AsyncWrite
Поскольку это трейты, мы можем реализовать AsyncRead и AsyncWrite для наших собственных типов. Обратите внимание, что эти трейты должны реализовываться только для неблокирующих типов ввода-вывода, которые интегрируются с системой типов futures. Другими словами, эти типы никогда не должны блокировать поток, и вместо этого текущая задача уведомляется, когда ресурс ввода-вывода готов.
Преобразование в Stream/Sink и обратно
Часто удобно инкапсулировать чтение и запись байтов в Stream или Sink данных.
Tokio предоставляет простые обертки для преобразования AsyncRead в Stream и обратно в крейте tokio-util, см. ReaderStream и StreamReader.
Также существуют служебные трейты, которые абстрагируют асинхронную буферизацию, необходимую для написания собственных адаптеров для кодирования и декодирования байтов в/из ваших структурированных данных, позволяя преобразовать что-то, что реализует AsyncRead/AsyncWrite, в Stream/Sink, см. Decoder и Encoder в модуле tokio-util::codec.
Стандартный ввод и вывод
Tokio предоставляет асинхронные API для стандартного ввода, вывода и ошибки. Эти API очень похожи на предоставляемые std, но они также реализуют AsyncRead и AsyncWrite.
Обратите внимание, что API стандартного ввода/вывода должны использоваться в контексте среды выполнения Tokio, так как они требуют функций, специфичных для Tokio, чтобы функционировать. Вызов этих функций вне среды выполнения Tokio вызовет панику.
Реэкспорт из std
Дополнительно Error, ErrorKind, Result и SeekFrom реэкспортируются из std::io для удобства использования.
#![allow(unused)] fn main() { pub use std::io::Error; pub use std::io::ErrorKind; pub use std::io::Result; pub use std::io::SeekFrom; }
Модули
| Имя | Флаги | Описание |
|---|---|---|
bsd | FreeBSD и net | Специфичные для BSD типы ввода-вывода. |
unix | Unix и net | Асинхронные структуры ввода-вывода, специфичные для Unix-подобных ОС. |
Структуры
| Имя | Флаги | Описание |
|---|---|---|
BufReader | io-util | Добавляет буферизацию любому читателю. |
BufStream | io-util | Оборачивает тип, который является AsyncWrite и AsyncRead, и буферизует его ввод и вывод. |
BufWriter | io-util | Оборачивает писатель и буферизует его вывод. |
Chain | io-util | Поток для метода chain. |
DuplexStream | io-util | Дуплексный канал для чтения и записи байтов в памяти. |
Empty | io-util | Игнорирует любые данные, записанные через AsyncWrite, и всегда будет пуст (возвращает ноль байт) при чтении через AsyncRead. |
Interest | См. описание | Интерес события готовности. |
Join | io-util | Объединяет два значения, реализующих AsyncRead и AsyncWrite, в один дескриптор. |
Lines | io-util | Читает строки из AsyncBufRead. |
ReadBuf | Обертка вокруг байтового буфера, который постепенно заполняется и инициализируется. | |
ReadHalf | io-util | Читаемая половина значения, возвращенного из split. |
Ready | См. описание | Описывает состояние готовности ресурса ввода-вывода. |
Repeat | io-util | Асинхронный читатель, который бесконечно повторяет один байт. |
SimplexStream | io-util | Однонаправленный канал для чтения и записи байтов в памяти. |
Sink | io-util | Асинхронный писатель, который будет перемещать данные в пустоту. |
Split | io-util | Разделитель для метода split. |
Stderr | io-std | Дескриптор потока стандартной ошибки процесса. |
Stdin | io-std | Дескриптор потока стандартного ввода процесса. |
Stdout | io-std | Дескриптор потока стандартного вывода процесса. |
Take | io-util | Поток для метода take. |
WriteHalf | io-util | Записываемая половина значения, возвращенного из split. |
Трейты
| Имя | Флаги | Описание |
|---|---|---|
AsyncBufRead | Асинхронно читает байты. | |
AsyncBufReadExt | io-util | Трейт-расширение, добавляющий служебные методы к типам AsyncBufRead. |
AsyncRead | Читает байты из источника. | |
AsyncReadExt | io-util | Читает байты из источника. |
AsyncSeek | Асинхронно перемещается по байтам. | |
AsyncSeekExt | io-util | Трейт-расширение, добавляющий служебные методы к типам AsyncSeek. |
AsyncWrite | Асинхронно записывает байты. | |
AsyncWriteExt | io-util | Записывает байты в приемник. |
Функции
| Имя | Флаги | Описание |
|---|---|---|
copy | io-util | Асинхронно копирует все содержимое читателя в писатель. |
copy_bidirectional | io-util | Копирует данные в обоих направлениях между a и b. |
copy_bidirectional_with_sizes | io-util | Копирует данные в обоих направлениях между a и b с использованием буферов указанного размера. |
copy_buf | io-util | Асинхронно копирует все содержимое читателя в писатель. |
duplex | io-util | Создает новую пару DuplexStream, которые ведут себя как пара соединенных сокетов. |
empty | io-util | Создает значение, которое всегда находится в EOF при чтении и игнорирует все записанные данные. |
join | io-util | Объединяет два значения, реализующих AsyncRead и AsyncWrite, в один дескриптор. |
repeat | io-util | Создает экземпляр асинхронного читателя, который бесконечно повторяет один байт. |
simplex | io-util | Создает однонаправленный буфер, который действует как канал в памяти. |
sink | io-util | Создает экземпляр асинхронного писателя, который успешно потребляет все данные. |
split | io-util | Разделяет одно значение, реализующее AsyncRead + AsyncWrite, на отдельные дескрипторы AsyncRead и AsyncWrite. |
stderr | io-std | Создает новый дескриптор стандартной ошибки текущего процесса. |
stdin | io-std | Создает новый дескриптор стандартного ввода текущего процесса. |
stdout | io-std | Создает новый дескриптор стандартного вывода текущего процесса. |
Модуль net
Доступно только в non-loom сборках.
TCP/UDP/Unix привязки для tokio.
Этот модуль содержит сетевые типы TCP/UDP/Unix, аналогичные стандартной библиотеке, которые могут использоваться для реализации сетевых протоколов.
Организация
TcpListenerиTcpStreamпредоставляют функциональность для связи по TCPUdpSocketпредоставляет функциональность для связи по UDPUnixListenerиUnixStreamпредоставляют функциональность для связи через Unix Domain Stream Socket (доступно только в Unix)UnixDatagramпредоставляет функциональность для связи через Unix Domain Datagram Socket (доступно только в Unix)tokio::net::unix::pipeдля FIFO каналов (доступно только в Unix)tokio::net::windows::named_pipeдля именованных каналов (доступно только в Windows)
Для ресурсов ввода-вывода, недоступных в tokio::net, вы можете использовать AsyncFd.
Модули
| Имя | Флаги | Описание |
|---|---|---|
tcp | net | Типы утилит TCP. |
unix | Unix и net | Специфичные для Unix сетевые типы. |
windows | Windows и net | Специфичные для Windows сетевые типы. |
Структуры
| Имя | Флаги | Описание |
|---|---|---|
TcpListener | net | TCP серверный сокет, прослушивающий подключения. |
TcpSocket | Non-WASI | TCP сокет, который еще не был преобразован в TcpStream или TcpListener. |
TcpStream | net | TCP поток между локальным и удаленным сокетом. |
UdpSocket | net | UDP сокет. |
UnixDatagram | Unix и net | Объект ввода-вывода, представляющий Unix датаграммный сокет. |
UnixListener | Unix и net | Unix сокет, который может принимать подключения от других Unix сокетов. |
UnixSocket | Unix и net | Unix сокет, который еще не был преобразован в UnixStream, UnixDatagram или UnixListener. |
UnixStream | Unix и net | Структура, представляющая подключенный Unix сокет. |
Трейты
| Имя | Описание |
|---|---|
ToSocketAddrs | Преобразует или разрешает (без блокировки) в одно или несколько значений SocketAddr. |
Функции
| Имя | Флаги | Описание |
|---|---|---|
lookup_host | net | Выполняет DNS разрешение. |
Модуль process
Доступно только с флагом функции process.
Реализация асинхронного управления процессами для Tokio.
Этот модуль предоставляет структуру Command, которая имитирует интерфейс типа std::process::Command из стандартной библиотеки, но предоставляет асинхронные версии функций, создающих процессы. Эти функции (spawn, status, output и их варианты) возвращают типы, "осведомленные" о future, которые взаимодействуют с Tokio. Поддержка асинхронных процессов обеспечивается через обработку сигналов в Unix и системные API в Windows.
Примеры
Вот пример программы, которая запустит echo hello world и затем будет ждать его завершения.
use tokio::process::Command; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // Использование аналогично типу `Command` стандартной библиотеки let mut child = Command::new("echo") .arg("hello") .arg("world") .spawn() .expect("не удалось запустить"); // Ожидаем завершения команды let status = child.wait().await?; println!("команда завершилась с: {}", status); Ok(()) }
Теперь рассмотрим пример, где мы не только запускаем echo hello world, но и захватываем его вывод.
use tokio::process::Command; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // Как и выше, но используем `output`, который возвращает future вместо // немедленного возврата `Child`. let output = Command::new("echo").arg("hello").arg("world") .output(); let output = output.await?; assert!(output.status.success()); assert_eq!(output.stdout, b"hello world\n"); Ok(()) }
Мы также можем читать ввод построчно.
use tokio::io::{BufReader, AsyncBufReadExt}; use tokio::process::Command; use std::process::Stdio; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut cmd = Command::new("cat"); // Указываем, что хотим получить стандартный вывод команды через канал. // По умолчанию стандартный ввод/вывод/ошибка наследуются от текущего процесса // (например, это означает, что стандартный ввод будет поступать с клавиатуры, // а стандартный вывод/ошибка будут направляться непосредственно в терминал, // если этот процесс вызван из командной строки). cmd.stdout(Stdio::piped()); let mut child = cmd.spawn() .expect("не удалось запустить команду"); let stdout = child.stdout.take() .expect("дочерний процесс не имел дескриптора stdout"); let mut reader = BufReader::new(stdout).lines(); // Убеждаемся, что дочерний процесс запущен в среде выполнения, чтобы он мог // самостоятельно прогрессировать, пока мы ожидаем какой-либо вывод. tokio::spawn(async move { let status = child.wait().await .expect("дочерний процесс столкнулся с ошибкой"); println!("статус дочернего процесса: {}", status); }); while let Some(line) = reader.next_line().await? { println!("Строка: {}", line); } Ok(()) }
Вот еще один пример использования sort с записью в стандартный ввод дочернего процесса и захватом вывода отсортированного текста.
use tokio::io::AsyncWriteExt; use tokio::process::Command; use std::process::Stdio; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut cmd = Command::new("sort"); // Указываем, что хотим канал как для вывода, так и для ввода. // Аналогично захвату вывода, настройка канала для stdin позволяет // использовать его как асинхронный писатель. cmd.stdout(Stdio::piped()); cmd.stdin(Stdio::piped()); let mut child = cmd.spawn().expect("не удалось запустить команду"); // Животные, которые мы хотим отсортировать let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"]; let mut stdin = child .stdin .take() .expect("дочерний процесс не имел дескриптора stdin"); // Записываем наших животных в дочерний процесс // Обратите внимание, что поведение `sort` - буферизировать _весь ввод_ перед записью любого вывода. // В общем смысле рекомендуется писать в дочерний процесс в отдельной задаче, // одновременно ожидая его завершения (или вывода), чтобы избежать взаимоблокировок // (например, дочерний процесс пытается записать какой-то вывод, но зависает в ожидании, // пока родительский процесс прочитает его, в то время как родительский процесс // завис в ожидании полной записи своего ввода перед чтением вывода). stdin .write(animals.join("\n").as_bytes()) .await .expect("не удалось записать в stdin"); // Мы удаляем дескриптор здесь, что сигнализирует EOF дочернему процессу. // Это сообщает дочернему процессу, что в канале больше нет данных. drop(stdin); let op = child.wait_with_output().await?; // Результаты должны вернуться в отсортированном порядке assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes()); Ok(()) }
С некоторой координацией мы также можем передавать вывод одной команды в другую.
use tokio::join; use tokio::process::Command; use std::process::Stdio; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut echo = Command::new("echo") .arg("hello world!") .stdout(Stdio::piped()) .spawn() .expect("не удалось запустить echo"); let tr_stdin: Stdio = echo .stdout .take() .unwrap() .try_into() .expect("не удалось преобразовать в Stdio"); let tr = Command::new("tr") .arg("a-z") .arg("A-Z") .stdin(tr_stdin) .stdout(Stdio::piped()) .spawn() .expect("не удалось запустить tr"); let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output()); assert!(echo_result.unwrap().success()); let tr_output = tr_output.expect("не удалось дождаться tr"); assert!(tr_output.status.success()); assert_eq!(tr_output.stdout, b"HELLO WORLD!\n"); Ok(()) }
Особенности
Удаление/Отмена
Аналогично поведению стандартной библиотеки и в отличие от парадигмы futures, где удаление подразумевает отмену, запущенный процесс по умолчанию продолжит выполняться даже после удаления дескриптора Child.
Метод Command::kill_on_drop может быть использован для изменения этого поведения и убийства дочернего процесса, если обертка Child удалена до его завершения.
Процессы Unix
В Unix-платформах процессы должны быть "убраны" (reaped) их родительским процессом после завершения, чтобы освободить все ресурсы ОС. Дочерний процесс, который завершился, но еще не был убран своим родителем, считается "зомби"-процессом. Такие процессы продолжают учитываться в ограничениях, накладываемых системой, и наличие слишком большого количества зомби-процессов может препятствовать запуску дополнительных процессов.
Среда выполнения Tokio будет пытаться убирать и очищать любые процессы, которые она запустила, на основе принципа "максимального усилия". Не дается дополнительных гарантий относительно того, насколько быстро или как часто эта процедура будет выполняться.
Рекомендуется избегать удаления дескриптора дочернего процесса до его полного ожидания, если требуются более строгие гарантии очистки.
Структуры
| Имя | Описание |
|---|---|
Child | Представление дочернего процесса, запущенного в цикле событий. |
ChildStderr | Поток стандартной ошибки для запущенных дочерних процессов. |
ChildStdin | Поток стандартного ввода для запущенных дочерних процессов. |
ChildStdout | Поток стандартного вывода для запущенных дочерних процессов. |
Command | Эта структура имитирует API std::process::Command из стандартной библиотеки, но заменяет функции, создающие процесс, асинхронными вариантами. Основные предоставляемые асинхронные функции - spawn, status и output. |
Модуль runtime
Доступно только с флагом функции rt.
Среда выполнения Tokio.
В отличие от других программ на Rust, асинхронные приложения требуют поддержки среды выполнения. В частности, необходимы следующие службы среды выполнения:
- Цикл событий ввода-вывода, называемый драйвером, который управляет ресурсами ввода-вывода и распределяет события ввода-вывода между задачами, которые от них зависят.
- Планировщик для выполнения задач, использующих эти ресурсы ввода-вывода.
- Таймер для планирования работы, которая должна выполниться через заданный промежуток времени.
Runtime Tokio объединяет все эти службы в один тип, позволяя запускать, останавливать и настраивать их вместе. Однако часто не требуется настраивать Runtime вручную, и пользователь может просто использовать атрибут-макрос tokio::main, который создает Runtime "под капотом".
Выбор среды выполнения
Вот эмпирические правила для выбора правильной среды выполнения для вашего приложения:
+------------------------------------------------------+
| Нужен ли планировщик с work-stealing или многопоточный? |
+------------------------------------------------------+
| Да | Нет
| |
| |
v |
+------------------------+ |
| Многопоточный Runtime | |
+------------------------+ |
|
V
+--------------------------------+
| Выполняете ли вы `!Send` Future? |
+--------------------------------+
| Да | Нет
| |
V |
+--------------------------+ |
| Local Runtime (нестабильный) | |
+--------------------------+ |
|
v
+------------------------+
| Текущий поток Runtime |
+------------------------+
Приведенное дерево решений не является исчерпывающим. Существуют другие факторы, которые могут повлиять на ваше решение.
Связывание с синхронным кодом
Подробности см. на https://tokio.rs/tokio/topics/bridging.
Осведомленность о NUMA
Среда выполнения Tokio не осведомлена о NUMA (Non-Uniform Memory Access). Для лучшей производительности на системах с NUMA вы можете захотеть запустить несколько сред выполнения вместо одной.
Использование
Когда точная настройка не требуется, можно использовать атрибут-макрос tokio::main.
use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; 1024]; // В цикле читаем данные из сокета и записываем данные обратно. loop { let n = match socket.read(&mut buf).await { // сокет закрыт Ok(0) => return, Ok(n) => n, Err(e) => { println!("не удалось прочитать из сокета; err = {:?}", e); return; } }; // Записываем данные обратно if let Err(e) = socket.write_all(&buf[0..n]).await { println!("не удалось записать в сокет; err = {:?}", e); return; } } }); } }
Из контекста среды выполнения дополнительные задачи порождаются с помощью функции tokio::spawn. Future, порожденные с помощью этой функции, будут выполняться в том же пуле потоков, который используется Runtime.
Экземпляр Runtime также может использоваться напрямую.
use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Runtime; fn main() -> Result<(), Box<dyn std::error::Error>> { // Создаем среду выполнения let rt = Runtime::new()?; // Порождаем корневую задачу rt.block_on(async { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; 1024]; // В цикле читаем данные из сокета и записываем данные обратно. loop { let n = match socket.read(&mut buf).await { // сокет закрыт Ok(0) => return, Ok(n) => n, Err(e) => { println!("не удалось прочитать из сокета; err = {:?}", e); return; } }; // Записываем данные обратно if let Err(e) = socket.write_all(&buf[0..n]).await { println!("не удалось записать в сокет; err = {:?}", e); return; } } }); } }) }
Конфигурации среды выполнения
Tokio предоставляет несколько стратегий планирования задач, подходящих для разных приложений. Для выбора планировщика можно использовать построитель среды выполнения или атрибут #[tokio::main].
Многопоточный планировщик
Многопоточный планировщик выполняет future в пуле потоков, используя стратегию work-stealing. По умолчанию он запускает рабочий поток для каждого доступного ядра CPU в системе. Это, как правило, идеальная конфигурация для большинства приложений. Многопоточный планировщик требует флага функции rt-multi-thread и выбирается по умолчанию:
#![allow(unused)] fn main() { use tokio::runtime; let threaded_rt = runtime::Runtime::new()?; }
Большинство приложений должны использовать многопоточный планировщик, за исключением некоторых узких случаев использования, таких как когда требуется выполнение только в одном потоке.
Планировщик текущего потока
Планировщик текущего потока предоставляет однопоточный исполнитель future. Все задачи будут создаваться и выполняться в текущем потоке. Это требует флага функции rt.
#![allow(unused)] fn main() { use tokio::runtime; let rt = runtime::Builder::new_current_thread() .build()?; }
Драйверы ресурсов
При ручной настройке среды выполнения никакие драйверы ресурсов не включены по умолчанию. В этом случае попытка использования сетевых типов или типов времени завершится неудачей. Чтобы включить эти типы, необходимо включить драйверы ресурсов. Это делается с помощью Builder::enable_io и Builder::enable_time. В качестве сокращения Builder::enable_all включает оба драйвера ресурсов.
Время жизни порожденных потоков
Среда выполнения может порождать потоки в зависимости от ее конфигурации и использования. Многопоточный планировщик порождает потоки для планирования задач и для вызовов spawn_blocking.
Пока Runtime активен, потоки могут завершаться после периодов простоя. После удаления Runtime все потоки среды выполнения обычно завершаются, но при наличии непрекращаемой порожденной работы не гарантируется, что они были завершены. Подробнее см. документацию на уровне структуры.
Подробное поведение среды выполнения
Этот раздел дает более подробную информацию о том, как среда выполнения Tokio будет планировать задачи для выполнения.
На самом базовом уровне среда выполнения имеет коллекцию задач, которые нужно запланировать. Она будет повторно удалять задачу из этой коллекции и планировать ее (вызывая poll). Когда коллекция пуста, поток перейдет в спящий режим до тех пор, пока в коллекцию не будет добавлена задача.
Однако вышесказанного недостаточно для гарантии корректного поведения среды выполнения. Например, среда выполнения может иметь одну задачу, которая всегда готова к планированию, и планировать эту задачу каждый раз. Это проблема, потому что это "морит голодом" другие задачи, не планируя их. Чтобы решить эту проблему, Tokio предоставляет следующую гарантию честности:
Если общее количество задач не растет безгранично и никакая задача не блокирует поток, то гарантируется, что задачи планируются честно.
Или, более формально:
При следующих двух предположениях:
- Существует некоторое число
MAX_TASKS, такое что общее количество задач в среде выполнения в любой конкретный момент времени никогда не превышаетMAX_TASKS.- Существует некоторое число
MAX_SCHEDULE, такое что вызовpollдля любой задачи, порожденной в среде выполнения, возвращается в течениеMAX_SCHEDULEединиц времени.Тогда существует некоторое число
MAX_DELAY, такое что когда задача пробуждается, она будет запланирована средой выполнения в течениеMAX_DELAYединиц времени.
(Здесь MAX_TASKS и MAX_SCHEDULE могут быть любыми числами, и пользователь среды выполнения может выбрать их. Число MAX_DELAY контролируется средой выполнения и зависит от значения MAX_TASKS и MAX_SCHEDULE.)
Помимо приведенной выше гарантии честности, нет никаких гарантий относительно порядка, в котором задачи планируются. Также нет гарантии, что среда выполнения одинаково честна ко всем задачам. Например, если у среды выполнения есть две задачи A и B, которые обе готовы, то среда выполнения может запланировать A пять раз, прежде чем запланирует B. Это верно, даже если A уступает с помощью yield_now. Все, что гарантируется, это то, что она запланирует B в конечном итоге.
Обычно задачи планируются только если они были пробуждены вызовом wake на их waker. Однако это не гарантируется, и Tokio может планировать задачи, которые не были пробуждены, при некоторых обстоятельствах. Это называется ложным пробуждением (spurious wakeup).
Ввод-вывод и таймеры
Помимо планирования задач, среда выполнения также должна управлять ресурсами ввода-вывода и таймерами. Она делает это, периодически проверяя, есть ли какие-либо ресурсы ввода-вывода или таймеры, которые готовы, и пробуждая соответствующую задачу, чтобы она была запланирована.
Эти проверки выполняются периодически между планированием задач. При тех же предположениях, что и предыдущая гарантия честности, Tokio гарантирует, что она пробудит задачи с событием ввода-вывода или таймера в течение некоторого максимального количества единиц времени.
Реэкспорт
#![allow(unused)] fn main() { pub use dump::Dump; // `tokio_unstable` и `taskdump` и `Linux` и (`AArch64` или `x86` или `x86-64`) }
Модули
| Имя | Флаги | Описание |
|---|---|---|
dump | tokio_unstable и taskdump и Linux и (AArch64 или x86 или x86-64) | Снимки состояния среды выполнения. |
Структуры
| Имя | Флаги | Описание |
|---|---|---|
Builder | Строит Tokio Runtime с пользовательскими значениями конфигурации. | |
EnterGuard | Защита контекста среды выполнения. | |
Handle | Дескриптор среды выполнения. | |
HistogramConfiguration | tokio_unstable | Конфигурация для гистограммы количества опросов. |
Id | tokio_unstable | Непрозрачный ID, однозначно идентифицирующий среду выполнения относительно всех других текущих сред выполнения. |
LocalOptions | tokio_unstable | Опции конфигурации только для LocalRuntime. |
LocalRuntime | tokio_unstable | Локальная среда выполнения Tokio. |
LogHistogram | tokio_unstable | Логарифмическая гистограмма. |
LogHistogramBuilder | tokio_unstable | Конфигурация для LogHistogram. |
RngSeed | tokio_unstable | Seed для генерации случайных чисел. |
Runtime | Среда выполнения Tokio. | |
RuntimeMetrics | Дескриптор метрик среды выполнения. | |
TaskMeta | tokio_unstable | Метаданные задачи, предоставляемые пользовательскими хуками для событий задач. |
TryCurrentError | Ошибка, возвращаемая try_current, когда среда выполнения не запущена. |
Перечисления
| Имя | Флаги | Описание |
|---|---|---|
HistogramScale | tokio_unstable | Использует ли гистограмма для агрегации метрики линейную или логарифмическую шкалу. |
InvalidHistogramConfiguration | tokio_unstable | Ошибка построения гистограммы. |
RuntimeFlavor | Тип (flavor) среды выполнения. | |
UnhandledPanic | tokio_unstable | Как среда выполнения должна реагировать на необработанные паники. |
Модуль signal
Доступно только с флагом функции signal.
Асинхронная обработка сигналов для Tokio.
Обратите внимание, что обработка сигналов в целом является очень сложной темой и должна использоваться с большой осторожностью. Этот крейт пытается реализовать "лучшие практики" для обработки сигналов, но его следует оценить с учетом потребностей ваших собственных приложений, чтобы убедиться в его пригодности.
Также существуют некоторые фундаментальные ограничения этого крейта, документированные в OS-специфичных структурах.
Примеры
Вывод при получении уведомления "ctrl-c"
use tokio::signal; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { signal::ctrl_c().await?; println!("получен ctrl-c!"); Ok(()) }
Ожидание SIGHUP в Unix
use tokio::signal::unix::{signal, SignalKind}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // Бесконечный поток сигналов hangup. let mut stream = signal(SignalKind::hangup())?; // Выводим сообщение при каждом получении сигнала HUP loop { stream.recv().await; println!("получен сигнал HUP"); } }
Модули
| Имя | Флаги | Описание |
|---|---|---|
unix | Unix | Специфичные для Unix типы для обработки сигналов. |
windows | Windows | Специфичные для Windows типы для обработки сигналов. |
Функции
| Имя | Описание |
|---|---|
ctrl_c | Завершается, когда в процесс отправляется уведомление "ctrl-c". |
Модуль stream
Из-за того, что включение трейта Stream в std произошло позже выпуска Tokio 1.0, большинство утилит потоков Tokio были перемещены в крейт tokio-stream.
Почему Stream не был включен в Tokio 1.0?
Изначально мы планировали выпустить Tokio 1.0 со стабильным типом Stream, но, к сожалению, RFC не был вовремя принят, чтобы Stream попал в std на стабильном компиляторе к моменту выпуска Tokio 1.0. По этой причине команда решила переместить все утилиты на основе Stream в крейт tokio-stream. Хотя это не идеально, как только Stream попадет в стандартную библиотеку и период MSRV истечет, мы реализуем stream для наших различных типов.
Хотя это может показаться неудачным, не все потеряно, так как вы можете получить большую часть поддержки Stream с помощью async/await и циклов while let. Также возможно создать impl Stream из async fn с помощью крейта async-stream.
Пример
Преобразование sync::mpsc::Receiver в impl Stream
#![allow(unused)] fn main() { use tokio::sync::mpsc; let (tx, mut rx) = mpsc::channel::<usize>(16); let stream = async_stream::stream! { while let Some(item) = rx.recv().await { yield item; } }; }
Использование tokio-stream для дополнительной функциональности
#![allow(unused)] fn main() { use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tokio::sync::mpsc; let (tx, rx) = mpsc::channel::<usize>(16); // Преобразование Receiver в Stream с помощью tokio-stream let mut stream = ReceiverStream::new(rx); // Использование методов StreamExt while let Some(item) = stream.next().await { println!("получен элемент: {}", item); } }
Создание пользовательского Stream с async-stream
#![allow(unused)] fn main() { use async_stream::stream; use std::time::Duration; use tokio::time::sleep; let mut counter = 0; let stream = stream! { for _ in 0..5 { sleep(Duration::from_secs(1)).await; counter += 1; yield counter; } }; // Использование созданного потока tokio::pin!(stream); while let Some(value) = stream.next().await { println!("значение: {}", value); } }
Рекомендации
Для работы с потоками в Tokio рекомендуется:
- Использовать крейт
tokio-streamдля дополнительных утилит потоков - Использовать
async-streamдля создания пользовательских потоков - Использовать комбинацию
async/awaitи цикловwhile letдля базовой обработки потоков - Следить за обновлениями относительно включения
Streamв стандартную библиотеку
Альтернативы
До стабилизации Stream в std можно использовать следующие подходы:
- Использовать
futures::Streamиз крейтаfutures - Использовать
async-streamдля создания потоков - Использовать циклы
while letс асинхронными итераторами - Использовать каналы Tokio (
mpsc,broadcastи др.) как потоки данных
Модуль sync
Доступно только с флагом функции sync.
Примитивы синхронизации для использования в асинхронных контекстах.
Программы Tokio обычно организованы как набор задач, где каждая задача работает независимо и может выполняться на отдельных физических потоках. Примитивы синхронизации, предоставляемые в этом модуле, позволяют этим независимым задачам взаимодействовать друг с другом.
Передача сообщений
Наиболее распространенной формой синхронизации в программе Tokio является передача сообщений. Две задачи работают независимо и отправляют сообщения друг другу для синхронизации. Это имеет преимущество в виде избегания разделяемого состояния.
Передача сообщений реализована с использованием каналов. Канал поддерживает отправку сообщения от одной задачи-производителя к одной или нескольким задачам-потребителям. Tokio предоставляет несколько разновидностей каналов. Каждая разновидность канала поддерживает различные шаблоны передачи сообщений. Когда канал поддерживает несколько производителей, многие отдельные задачи могут отправлять сообщения. Когда канал поддерживает несколько потребителей, многие различные отдельные задачи могут получать сообщения.
Tokio предоставляет множество различных разновидностей каналов, поскольку различные шаблоны передачи сообщений лучше обрабатываются разными реализациями.
Канал oneshot
Канал oneshot поддерживает отправку одного значения от одного производителя к одному потребителю. Этот канал обычно используется для отправки результата вычисления ожидающей стороне.
Пример: использование канала oneshot для получения результата вычисления.
#![allow(unused)] fn main() { use tokio::sync::oneshot; async fn some_computation() -> String { "представляет результат вычисления".to_string() } let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let res = some_computation().await; tx.send(res).unwrap(); }); // Делаем другую работу, пока вычисление происходит в фоне // Ожидаем результат вычисления let res = rx.await.unwrap(); }
Примечание: если задача производит результат вычисления как свое последнее действие перед завершением, можно использовать JoinHandle для получения этого значения вместо выделения ресурсов для канала oneshot. Ожидание JoinHandle возвращает Result. Если задача паникует, JoinHandle возвращает Err с причиной паники.
Пример:
#![allow(unused)] fn main() { async fn some_computation() -> String { "результат вычисления".to_string() } let join_handle = tokio::spawn(async move { some_computation().await }); // Делаем другую работу, пока вычисление происходит в фоне // Ожидаем результат вычисления let res = join_handle.await.unwrap(); }
Канал mpsc
Канал mpsc поддерживает отправку многих значений от многих производителей к одному потребителю. Этот канал часто используется для отправки работы задаче или для получения результата многих вычислений.
Это также канал, который вы должны использовать, если хотите отправлять много сообщений от одного производителя к одному потребителю. Нет специального канала spsc.
Пример: использование mpsc для постепенной потоковой передачи результатов серии вычислений.
#![allow(unused)] fn main() { use tokio::sync::mpsc; async fn some_computation(input: u32) -> String { format!("результат вычисления {}", input) } let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { for i in 0..10 { let res = some_computation(i).await; tx.send(res).await.unwrap(); } }); while let Some(res) = rx.recv().await { println!("получено = {}", res); } }
Аргумент mpsc::channel - это емкость канала. Это максимальное количество значений, которые могут храниться в канале в ожидании получения в любой момент времени. Правильная установка этого значения является ключевой для реализации надежных программ, поскольку емкость канала играет критическую роль в обработке обратного давления.
Распространенным шаблоном параллелизма для управления ресурсами является порождение задачи, посвященной управлению этим ресурсом, и использование передачи сообщений между другими задачами для взаимодействия с ресурсом. Ресурсом может быть что угодно, что не может использоваться конкурентно. Некоторые примеры включают сокет и состояние программы. Например, если нескольким задачам нужно отправлять данные через один сокет, породите задачу для управления сокетом и используйте канал для синхронизации.
Пример: отправка данных из многих задач через один сокет с использованием передачи сообщений.
use tokio::io::{self, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::mpsc; #[tokio::main] async fn main() -> io::Result<()> { let mut socket = TcpStream::connect("www.example.com:1234").await?; let (tx, mut rx) = mpsc::channel(100); for _ in 0..10 { // Каждой задаче нужен собственный дескриптор `tx`. Это делается путем клонирования // исходного дескриптора. let tx = tx.clone(); tokio::spawn(async move { tx.send(&b"данные для записи"[..]).await.unwrap(); }); } // Половина `rx` канала возвращает `None`, когда **все** клоны `tx` // удалены. Чтобы гарантировать возврат `None`, удалите дескриптор, принадлежащий // текущей задаче. Если этот дескриптор `tx` не удален, всегда будет // один оставшийся дескриптор `tx`. drop(tx); while let Some(res) = rx.recv().await { socket.write_all(res).await?; } Ok(()) }
Каналы mpsc и oneshot можно комбинировать для предоставления шаблона синхронизации типа запрос/ответ с разделяемым ресурсом. Задача порождается для синхронизации ресурса и ожидает команды, полученные по каналу mpsc. Каждая команда включает oneshot Sender, на который отправляется результат команды.
Пример: использование задачи для синхронизации счетчика u64. Каждая задача отправляет команду "получить и увеличить". Значение счетчика до увеличения отправляется через предоставленный канал oneshot.
#![allow(unused)] fn main() { use tokio::sync::{oneshot, mpsc}; use Command::Increment; enum Command { Increment, // Здесь можно добавить другие команды } let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100); // Порождение задачи для управления счетчиком tokio::spawn(async move { let mut counter: u64 = 0; while let Some((cmd, response)) = cmd_rx.recv().await { match cmd { Increment => { let prev = counter; counter += 1; response.send(prev).unwrap(); } } } }); let mut join_handles = vec![]; // Порождение задач, которые будут отправлять команду увеличения. for _ in 0..10 { let cmd_tx = cmd_tx.clone(); join_handles.push(tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); cmd_tx.send((Increment, resp_tx)).await.ok().unwrap(); let res = resp_rx.await.unwrap(); println!("предыдущее значение = {}", res); })); } // Ожидание завершения всех задач for join_handle in join_handles.drain(..) { join_handle.await.unwrap(); } }
Канал broadcast
Канал broadcast поддерживает отправку многих значений от многих производителей ко многим потребителям. Каждый потребитель получит каждое значение. Этот канал можно использовать для реализации шаблонов типа "fan out", распространенных в системах pub/sub или "chat".
Этот канал используется реже, чем oneshot и mpsc, но все еще имеет свои случаи использования.
Это также канал, который вы должны использовать, если хотите транслировать значения от одного производителя ко многим потребителям. Нет специального канала spmc broadcast.
Базовое использование:
#![allow(unused)] fn main() { use tokio::sync::broadcast; let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); tokio::spawn(async move { assert_eq!(rx1.recv().await.unwrap(), 10); assert_eq!(rx1.recv().await.unwrap(), 20); }); tokio::spawn(async move { assert_eq!(rx2.recv().await.unwrap(), 10); assert_eq!(rx2.recv().await.unwrap(), 20); }); tx.send(10).unwrap(); tx.send(20).unwrap(); }
Канал watch
Канал watch поддерживает отправку многих значений от многих производителей ко многим потребителям. Однако в канале хранится только самое последнее значение. Потребители уведомляются, когда отправляется новое значение, но нет гарантии, что потребители увидят все значения.
Канал watch похож на канал broadcast с емкостью 1.
Случаи использования канала watch включают трансляцию изменений конфигурации или сигнализацию изменений состояния программы, таких как переход к завершению работы.
Пример: использование канала watch для уведомления задач об изменениях конфигурации. В этом примере файл конфигурации проверяется периодически. Когда файл изменяется, изменения конфигурации сигнализируются потребителям.
#![allow(unused)] fn main() { use tokio::sync::watch; use tokio::time::{self, Duration, Instant}; use std::io; #[derive(Debug, Clone, Eq, PartialEq)] struct Config { timeout: Duration, } impl Config { async fn load_from_file() -> io::Result<Config> { // логика загрузки файла и десериализации здесь } } async fn my_async_operation() { // Делаем что-то здесь } // Загружаем начальное значение конфигурации let mut config = Config::load_from_file().await.unwrap(); // Создаем канал watch, инициализированный загруженной конфигурацией let (tx, rx) = watch::channel(config.clone()); // Порождение задачи для мониторинга файла. tokio::spawn(async move { loop { // Ждем 10 секунд между проверками time::sleep(Duration::from_secs(10)).await; // Загружаем файл конфигурации let new_config = Config::load_from_file().await.unwrap(); // Если конфигурация изменилась, отправляем новое значение конфигурации // по каналу watch. if new_config != config { tx.send(new_config.clone()).unwrap(); config = new_config; } } }); let mut handles = vec![]; // Порождение задач, которые выполняют асинхронную операцию не более `timeout`. Если // время ожидания истекает, перезапускаем операцию. // // Задача одновременно отслеживает изменения `Config`. Когда // длительность таймаута изменяется, таймаут обновляется без перезапуска // выполняемой операции. for _ in 0..5 { // Клонируем дескриптор watch конфигурации для использования в этой задаче let mut rx = rx.clone(); let handle = tokio::spawn(async move { // Начинаем первоначальную операцию и закрепляем future в стеке. // Закрепление в стеке требуется для возобновления операции // при множественных вызовах `select!` let op = my_async_operation(); tokio::pin!(op); // Получаем начальное значение конфигурации let mut conf = rx.borrow().clone(); let mut op_start = Instant::now(); let sleep = time::sleep_until(op_start + conf.timeout); tokio::pin!(sleep); loop { tokio::select! { _ = &mut sleep => { // Операция истекла. Перезапускаем ее op.set(my_async_operation()); // Отслеживаем новое время начала op_start = Instant::now(); // Перезапускаем таймаут sleep.set(time::sleep_until(op_start + conf.timeout)); } _ = rx.changed() => { conf = rx.borrow_and_update().clone(); // Конфигурация была обновлена. Обновляем // `sleep` используя новое значение `timeout`. sleep.as_mut().reset(op_start + conf.timeout); } _ = &mut op => { // Операция завершена! return } } } }); handles.push(handle); } for handle in handles.drain(..) { handle.await.unwrap(); } }
Синхронизация состояния
Оставшиеся примитивы синхронизации сосредоточены на синхронизации состояния. Это асинхронные эквиваленты версий, предоставляемых std. Они работают аналогично своим аналогам из std, но будут ждать асинхронно вместо блокировки потока.
- Barrier - Гарантирует, что несколько задач будут ждать друг друга для достижения точки в программе, прежде чем продолжить выполнение все вместе.
- Mutex - Механизм взаимного исключения, который гарантирует, что не более одного потока в данный момент времени может получить доступ к некоторым данным.
- Notify - Базовое уведомление задачи.
Notifyподдерживает уведомление принимающей задачи без отправки данных. В этом случае задача просыпается и возобновляет обработку. - RwLock - Предоставляет механизм взаимного исключения, который позволяет нескольким читателям одновременно, позволяя только одному писателю за раз. В некоторых случаях это может быть более эффективно, чем мьютекс.
- Semaphore - Ограничивает количество параллелизма. Семафор содержит некоторое количество разрешений, которые задачи могут запрашивать для входа в критическую секцию. Семафоры полезны для реализации ограничений любого рода.
Совместимость со средой выполнения
Все примитивы синхронизации, предоставляемые в этом модуле, не зависят от среды выполнения. Вы можете свободно перемещать их между различными экземплярами среды выполнения Tokio или даже использовать их из сред выполнения, отличных от Tokio.
При использовании в среде выполнения Tokio примитивы синхронизации участвуют в кооперативном планировании, чтобы избежать голодания. Эта функция не применяется при использовании из сред выполнения, отличных от Tokio.
В качестве исключения, методы, оканчивающиеся на _timeout, не являются независимыми от среды выполнения, поскольку они требуют доступа к таймеру Tokio. См. документацию каждого метода *_timeout для получения дополнительной информации о его использовании.
Модули
| Имя | Описание |
|---|---|
broadcast | Многопроизводительная, многопотребительская широковещательная очередь. Каждое отправленное значение видно всем потребителям. |
futures | Именованные типы future. |
mpsc | Многопроизводительная, однопотребительская очередь для отправки значений между асинхронными задачами. |
oneshot | Одноразовый канал используется для отправки одного сообщения между асинхронными задачами. Функция channel используется для создания пары дескрипторов Sender и Receiver, которые образуют канал. |
watch | Многопроизводительный, многопотребительский канал, который сохраняет только последнее отправленное значение. |
Структуры
| Имя | Описание |
|---|---|
AcquireError | Ошибка, возвращаемая функцией Semaphore::acquire. |
Barrier | Барьер позволяет нескольким задачам синхронизировать начало некоторого вычисления. |
BarrierWaitResult | BarrierWaitResult возвращается wait, когда все задачи в Barrier встретились. |
MappedMutexGuard | Дескриптор удерживаемого Mutex, к которому была применена функция через MutexGuard::map. |
Mutex | Асинхронный тип, подобный Mutex. |
MutexGuard | Дескриптор удерживаемого Mutex. Защита может удерживаться через любую точку .await, так как она Send. |
Notify | Уведомляет одну задачу о пробуждении. |
OnceCell | Потокобезопасная ячейка, в которую можно записать только один раз. |
OwnedMappedMutexGuard | Владеемый дескриптор удерживаемого Mutex, к которому была применена функция через OwnedMutexGuard::map. |
OwnedMutexGuard | Владеемый дескриптор удерживаемого Mutex. |
OwnedRwLockMappedWriteGuard | Владеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении. |
OwnedRwLockReadGuard | Владеемая структура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении. |
OwnedRwLockWriteGuard | Владеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении. |
OwnedSemaphorePermit | Владеемое разрешение от семафора. |
RwLock | Асинхронная блокировка читатель-писатель. |
RwLockMappedWriteGuard | Структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении. |
RwLockReadGuard | Структура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении. |
RwLockWriteGuard | Структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении. |
Semaphore | Считающий семафор, выполняющий асинхронное получение разрешений. |
SemaphorePermit | Разрешение от семафора. |
SetOnce | Потокобезопасная ячейка, в которую можно записать только один раз. |
SetOnceError | Ошибка, которая может быть возвращена из SetOnce::set. |
TryLockError | Ошибка, возвращаемая функциями Mutex::try_lock, RwLock::try_read и RwLock::try_write. |
Перечисления
| Имя | Описание |
|---|---|
SetError | Ошибки, которые могут быть возвращены из OnceCell::set. |
TryAcquireError | Ошибка, возвращаемая функцией Semaphore::try_acquire. |
Модуль task
Асинхронные "зеленые потоки" (легковесные задачи).
Что такое задачи?
Задача — это легковесная, неблокирующая единица выполнения. Задача похожа на поток операционной системы, но вместо управления планировщиком ОС, ими управляет среда выполнения Tokio. Другое название этого общего шаблона — зеленые потоки (green threads). Если вы знакомы с горутинами Go, корутинами Kotlin или процессами Erlang, вы можете считать задачи Tokio чем-то подобным.
Ключевые моменты о задачах:
-
Задачи легковесны. Поскольку задачи планируются средой выполнения Tokio, а не операционной системой, создание новых задач или переключение между задачами не требует переключения контекста и имеет довольно низкие накладные расходы. Создание, выполнение и уничтожение большого количества задач достаточно дешево, особенно по сравнению с потоками ОС.
-
Задачи планируются кооперативно. Большинство операционных систем реализуют вытесняющую многозадачность. Это техника планирования, при которой ОС позволяет каждому потоку выполняться в течение периода времени, а затем вытесняет его, временно приостанавливая этот поток и переключаясь на другой. Задачи, напротив, реализуют кооперативную многозадачность. При кооперативной многозадачности задаче разрешено выполняться до тех пор, пока она не уступит (yield), указывая планировщику среды выполнения Tokio, что в данный момент она не может продолжать выполнение. Когда задача уступает, среда выполнения Tokio переключается на выполнение следующей задачи.
-
Задачи неблокирующие. Обычно, когда поток ОС выполняет ввод-вывод или должен синхронизироваться с другим потоком, он блокируется, позволяя ОС запланировать другой поток. Когда задача не может продолжить выполнение, она должна уступить, позволяя среде выполнения Tokio запланировать другую задачу. Задачи, как правило, не должны выполнять системные вызовы или другие операции, которые могут заблокировать поток, поскольку это предотвратит выполнение других задач, работающих в том же потоке. Вместо этого этот модуль предоставляет API для выполнения блокирующих операций в асинхронном контексте.
Работа с задачами
Этот модуль предоставляет следующие API для работы с задачами:
Порождение задач (Spawning)
Возможно, самой важной функцией в этом модуле является task::spawn. Эту функцию можно рассматривать как асинхронный эквивалент thread::spawn из стандартной библиотеки. Она принимает асинхронный блок или другой future и создает новую задачу для конкурентного выполнения этой работы:
#![allow(unused)] fn main() { use tokio::task; task::spawn(async { // выполняем некоторую работу здесь... }); }
Как и std::thread::spawn, task::spawn возвращает структуру JoinHandle. Сам JoinHandle является future, который может быть использован для ожидания результата порожденной задачи. Например:
#![allow(unused)] fn main() { use tokio::task; let join = task::spawn(async { // ... "hello world!" }); // ... // Ожидаем результат порожденной задачи. let result = join.await?; assert_eq!(result, "hello world!"); }
Опять же, как и тип JoinHandle из std::thread, если порожденная задача паникует, ожидание ее JoinHandle вернет JoinError. Например:
#![allow(unused)] fn main() { use tokio::task; let join = task::spawn(async { panic!("something bad happened!") }); // Возвращенный результат указывает, что задача завершилась неудачно. assert!(join.await.is_err()); }
spawn, JoinHandle и JoinError доступны при включенном флаге функции "rt".
Отмена (Cancellation)
Порожденные задачи могут быть отменены с помощью методов JoinHandle::abort или AbortHandle::abort. При вызове одного из этих методов задаче сигнализируют о завершении работы в следующий раз, когда она уступит в точке .await. Если задача уже простаивает, она будет завершена как можно скорее, без повторного запуска перед завершением. Кроме того, завершение работы среды выполнения Tokio (например, возврат из #[tokio::main]) немедленно отменяет все задачи в ней.
При завершении задач выполнение остановится на том .await, на котором она уступила. Все локальные переменные уничтожаются путем запуска их деструкторов. После завершения остановки ожидание JoinHandle завершится ошибкой отмены.
Обратите внимание, что отмена задачи не гарантирует, что она завершится с ошибкой отмены, поскольку она может сначала завершиться нормально. Например, если задача не уступает среде выполнения ни в одной точке между вызовом abort и концом задачи, то JoinHandle вместо этого сообщит, что задача завершилась нормально.
Имейте в виду, что задачи, порожденные с помощью spawn_blocking, не могут быть отменены, потому что они не являются асинхронными. Если вы вызовете abort для задачи spawn_blocking, это не окажет никакого эффекта, и задача продолжит выполняться нормально. Исключение составляет случай, когда задача еще не начала выполняться; в этом случае вызов abort может предотвратить запуск задачи.
Имейте в виду, что вызовы JoinHandle::abort только планируют отмену задачи и возвращаются до завершения отмены. Чтобы дождаться завершения отмены, дождитесь завершения задачи, ожидая JoinHandle. Аналогично, метод JoinHandle::is_finished не возвращает true, пока отмена не завершится.
Многократный вызов JoinHandle::abort имеет тот же эффект, что и однократный вызов.
Tokio также предоставляет AbortHandle, который похож на JoinHandle, за исключением того, что он не предоставляет механизма ожидания завершения задачи. Каждая задача может иметь только один JoinHandle, но она может иметь более одного AbortHandle.
Блокирование и уступка (Blocking and Yielding)
Как мы обсуждали выше, код, выполняющийся в асинхронных задачах, не должен выполнять операции, которые могут блокировать. Блокирующая операция, выполненная в задаче, работающей в потоке, который также выполняет другие задачи, заблокирует весь поток, не позволяя другим задачам выполняться.
Вместо этого Tokio предоставляет два API для выполнения блокирующих операций в асинхронном контексте: task::spawn_blocking и task::block_in_place.
Имейте в виду, что если вы вызываете не-асинхронный метод из асинхронного кода, этот не-асинхронный метод все еще находится внутри асинхронного контекста, поэтому вам также следует избегать там блокирующих операций. Это включает деструкторы объектов, уничтожаемых в асинхронном коде.
spawn_blocking
Функция task::spawn_blocking похожа на функцию task::spawn, обсуждавшуюся в предыдущем разделе, но вместо порождения неблокирующего future в среде выполнения Tokio она порождает блокирующую функцию в выделенном пуле потоков для блокирующих задач. Например:
#![allow(unused)] fn main() { use tokio::task; task::spawn_blocking(|| { // выполняем ресурсоемкую работу или вызываем синхронный код }); }
Так же, как task::spawn, task::spawn_blocking возвращает JoinHandle, который мы можем использовать для ожидания результата блокирующей операции:
#![allow(unused)] fn main() { let join = task::spawn_blocking(|| { // выполняем ресурсоемкую работу или вызываем синхронный код "blocking completed" }); let result = join.await?; assert_eq!(result, "blocking completed"); }
block_in_place
При использовании многопоточной среды выполнения также доступна функция task::block_in_place. Как и task::spawn_blocking, эта функция позволяет запускать блокирующую операцию из асинхронного контекста. Однако, в отличие от spawn_blocking, block_in_place работает путем перевода текущего рабочего потока в блокирующий поток, перемещая другие задачи, выполняющиеся в этом потоке, на другой рабочий поток. Это может повысить производительность, избегая переключений контекста.
Например:
#![allow(unused)] fn main() { use tokio::task; let result = task::block_in_place(|| { // выполняем ресурсоемкую работу или вызываем синхронный код "blocking completed" }); assert_eq!(result, "blocking completed"); }
yield_now
Кроме того, этот модуль предоставляет асинхронную функцию task::yield_now, которая аналогична thread::yield_now из стандартной библиотеки. Вызов и ожидание этой функции заставят текущую задачу уступить планировщику среды выполнения Tokio, позволяя запланировать другие задачи. В конечном итоге уступившая задача будет опрошена снова, что позволит ей выполниться. Например:
#![allow(unused)] fn main() { use tokio::task; async { task::spawn(async { // ... println!("spawned task done!") }); // Уступаем, позволяя сначала выполниться новой задаче. task::yield_now().await; println!("main task done!"); } }
Модули
| Имя | Флаги | Описание |
|---|---|---|
coop | rt | Утилиты для улучшенного кооперативного планирования. |
futures | rt | Future, связанные с задачами. |
join_set | rt | Коллекция задач, порожденных в среде выполнения Tokio. |
Структуры
| Имя | Флаги | Описание |
|---|---|---|
AbortHandle | rt | Владение разрешением на отмену порожденной задачи без ожидания ее завершения. |
Builder | tokio_unstable и tracing | Фабрика, используемая для настройки свойств новой задачи. |
Id | rt | Непрозрачный ID, однозначно идентифицирующий задачу относительно других текущих. |
JoinError | rt | Задача не выполнилась до завершения. |
JoinHandle | rt | Владение разрешением на присоединение к задаче (ожидание ее завершения). |
JoinSet | rt | Коллекция задач, порожденных в среде выполнения Tokio. |
LocalEnterGuard | rt | Контекстная защита для LocalSet. |
LocalKey | rt | Ключ для локальных данных задачи. |
LocalSet | rt | Набор задач, которые выполняются в одном потоке. |
Функции
| Имя | Флаги | Описание |
|---|---|---|
block_in_place | rt-multi-thread | Выполняет предоставленную блокирующую функцию в текущем потоке без блокировки исполнителя. |
id | rt | Возвращает Id текущей выполняемой задачи. |
spawn | rt | Порождает новую асинхронную задачу, возвращая JoinHandle для нее. |
spawn_blocking | rt | Выполняет предоставленное замыкание в потоке, где блокировка допустима. |
spawn_local | rt | Порождает future !Send в текущем LocalSet или LocalRuntime. |
try_id | rt | Возвращает Id текущей выполняемой задачи или None, если вызвано вне задачи. |
yield_now | rt | Уступает выполнение обратно среде выполнения Tokio. |
Модуль time
Доступно только с флагом функции time.
Утилиты для отслеживания времени.
Этот модуль предоставляет ряд типов для выполнения кода по истечении заданного периода времени.
-
Sleep- это future, который не выполняет работы и завершается в определенный момент времениInstant. -
Interval- это поток, выдающий значение с фиксированным периодом. Он инициализируетсяDurationи повторно выдает значение каждый раз, когда длительность истекает. -
Timeout- оборачивает future или поток, устанавливая верхнюю границу времени, которое ему разрешено выполняться. Если future или поток не завершается вовремя, он отменяется и возвращается ошибка.
Этих типов достаточно для обработки большого количества сценариев, связанных со временем.
Эти типы должны использоваться в контексте Runtime.
Примеры
Ожидание 100 мс и вывод сообщения
#![allow(unused)] fn main() { use std::time::Duration; use tokio::time::sleep; sleep(Duration::from_millis(100)).await; println!("100 мс прошло"); }
Требование, чтобы операция занимала не более 1 секунды
#![allow(unused)] fn main() { use tokio::time::{timeout, Duration}; async fn long_future() { // выполняем работу здесь } let res = timeout(Duration::from_secs(1), long_future()).await; if res.is_err() { println!("операция превысила время ожидания"); } }
Простой пример использования interval для выполнения задачи каждые две секунды
Разница между interval и sleep заключается в том, что interval измеряет время с последнего тика, что означает, что .tick().await может ждать меньше времени, чем указанная для интервала длительность, если между вызовами .tick().await прошло некоторое время.
Если в примере ниже заменить tick на sleep, задача будет выполняться только раз в три секунды, а не каждые две секунды.
#![allow(unused)] fn main() { use tokio::time; async fn task_that_takes_a_second() { println!("hello"); time::sleep(time::Duration::from_secs(1)).await } let mut interval = time::interval(time::Duration::from_secs(2)); for _i in 0..5 { interval.tick().await; task_that_takes_a_second().await; } }
Использование таймаута с потоком
#![allow(unused)] fn main() { use tokio::time::{timeout, Duration}; use tokio_stream::{self as stream, StreamExt}; let mut stream = stream::iter(1..=10); let result = timeout(Duration::from_secs(1), async move { while let Some(value) = stream.next().await { println!("обрабатываем значение: {}", value); // Имитация работы tokio::time::sleep(Duration::from_millis(200)).await; } }).await; match result { Ok(_) => println!("поток завершился успешно"), Err(_) => println!("поток превысил время ожидания"), } }
Использование sleep_until для планирования на определенное время
#![allow(unused)] fn main() { use tokio::time::{sleep_until, Instant}; use std::time::Duration; // Запланировать выполнение через 5 секунд от текущего момента let deadline = Instant::now() + Duration::from_secs(5); sleep_until(deadline).await; println!("5 секунд прошло с момента планирования"); }
Реэкспорт
#![allow(unused)] fn main() { pub use std::time::Duration; }
Модули
| Имя | Флаги | Описание |
|---|---|---|
error | Типы ошибок времени. |
Структуры
| Имя | Описание |
|---|---|
Instant | Измерение монотонно неубывающих часов. Непрозрачно и полезно только с Duration. |
Interval | Интервал, возвращаемый interval и interval_at. |
Sleep | Future, возвращаемый sleep и sleep_until. |
Timeout | Future, возвращаемый timeout и timeout_at. |
Перечисления
| Имя | Описание |
|---|---|
MissedTickBehavior | Определяет поведение Interval при пропуске тика. |
Функции
| Имя | Флаги | Описание |
|---|---|---|
advance | test-util | Перемещает время вперед. |
interval | Создает новый Interval, который выдает значения с интервалом period. Первый тик завершается немедленно. Поведение по умолчанию для пропущенных тиков - Burst, но это можно настроить, вызвав set_missed_tick_behavior. | |
interval_at | Создает новый Interval, который выдает значения с интервалом period, причем первый тик завершается в start. Поведение по умолчанию для пропущенных тиков - Burst, но это можно настроить, вызвав set_missed_tick_behavior. | |
pause | test-util | Приостанавливает время. |
resume | test-util | Возобновляет время. |
sleep | Ожидает, пока не пройдет указанная длительность. | |
sleep_until | Ожидает, пока не будет достигнут указанный крайний срок. | |
timeout | Требует, чтобы Future завершился до истечения указанной длительности. | |
timeout_at | Требует, чтобы Future завершился до указанного момента времени. |
Дополнительные примеры
Настройка поведения при пропущенных тиках
#![allow(unused)] fn main() { use tokio::time::{interval, Duration, MissedTickBehavior}; let mut interval = interval(Duration::from_secs(1)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; println!("тик"); // Если эта задача занимает больше 1 секунды, следующие тики будут пропущены // до следующего подходящего момента времени } }
Комбинирование нескольких таймеров
#![allow(unused)] fn main() { use tokio::time::{sleep, timeout, Duration}; async fn complex_operation() -> Result<(), Box<dyn std::error::Error>> { // Ожидание 500 мс sleep(Duration::from_millis(500)).await; // Выполнение операции с таймаутом 2 секунды let result = timeout(Duration::from_secs(2), async { // Длительная операция sleep(Duration::from_secs(1)).await; "результат операции" }).await?; println!("{}", result); Ok(()) } }
Крейт futures_concurrency
Производительные, переносимые, структурированные операции конкурентности для асинхронного Rust. Работает с любой средой выполнения, не стирает времена жизни, всегда обрабатывает отмену и всегда возвращает результат вызывающей стороне.
futures-concurrency предоставляет операции конкурентности как для групп future, так и для stream. Как для ограниченных, так и для неограниченных наборов future и stream. В обоих случаях производительность должна быть на уровне, если не превосходить, традиционных реализаций исполнителей.
Примеры
Ожидание нескольких future разных типов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::future; let a = future::ready(1u8); let b = future::ready("hello"); let c = future::ready(3u16); assert_eq!((a, b, c).join().await, (1, "hello", 3)); }
Конкурентная обработка элементов в коллекции
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let v: Vec<_> = vec!["chashu", "nori"] .into_co_stream() .map(|msg| async move { format!("hello {msg}") }) .collect() .await; assert_eq!(v, &["hello chashu", "hello nori"]); }
Доступ к данным стека вне области видимости future
Адаптировано из std::thread::scope.
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let mut container = vec![1, 2, 3]; let mut num = 0; let a = async { println!("hello from the first future"); dbg!(&container); }; let b = async { println!("hello from the second future"); num += container[0] + container[2]; }; println!("hello from the main future"); let _ = (a, b).join().await; container.push(4); assert_eq!(num, container.len()); }
Операции
Future
Для future, которые возвращают обычный тип T, доступны только операции join и race. join ожидает завершения всех future, тогда как race ожидает завершения первого future. Однако для future, которые возвращают Try<Output = T>, доступны две дополнительные операции. Следующая таблица описывает поведение операций конкурентности для ошибочных future:
| Ожидание всех результатов | Ожидание первого результата | |
|---|---|---|
| Продолжать при ошибке | Future::join | Future::race_ok |
| Прерывать при ошибке | Future::try_join | Future::race |
Следующие реализации future предоставляются futures-concurrency:
FutureGroup: Расширяемая группа future, работающих как единое целое.tuple:join,try_join,race,race_okarray:join,try_join,race,race_okVec:join,try_join,race,race_ok
Stream
Stream выдают результаты по одному, что означает, что решение о прекращении итерации одинаково для ошибочных и безошибочных потоков. Операции, предоставляемые для потоков, можно классифицировать на основе того, могут ли их входные данные оцениваться конкурентно и могут ли их выходные данные обрабатываться конкурентно.
Конкретно в случае merge, он принимает N потоков и выдает элементы по одному, как только они становятся доступными. Это позволяет конкурентно обрабатывать выходные данные отдельных потоков последующими операциями.
| Последовательная обработка вывода | Конкурентная обработка вывода | |
|---|---|---|
| Последовательная оценка ввода | Stream::chain | пока недоступно ‡ |
| Конкурентная оценка ввода | Stream::zip | Stream::merge |
‡: Это можно было бы решить с помощью гипотетической операции Stream::unzip, однако, поскольку мы стремимся к семантической совместимости с std::iter::Iterator в наших операциях, путь к её добавлению в настоящее время неясен.
Следующие реализации потоков предоставляются futures-concurrency:
StreamGroup: Расширяемая группа потоков, работающих как единое целое.ConcurrentStream: Трейт для асинхронных потоков, которые могут конкурентно обрабатывать элементы.tuple:chain,merge,ziparray:chain,merge,zipVec:chain,merge,zip
Поддержка сред выполнения
futures-concurrency не зависит от наличия какого-либо исполнителя среды выполнения. Это позволяет ему работать "из коробки" с любой асинхронной средой выполнения, включая: tokio, async-std, smol, glommio и monoio. Он также поддерживает среды #[no_std], что позволяет использовать его со встроенными асинхронными средами выполнения, такими как embassy.
Флаги функций
Флаг функции std включен по умолчанию. Для нацеливания на среды alloc или no_std вы можете использовать следующую конфигурацию:
[dependencies]
# no_std
futures-concurrency = { version = "7.5.0", default-features = false }
# alloc
futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }
Дополнительное чтение
futures-concurrency разрабатывался в течение нескольких лет. Основной сопровождающий — Йош Вуйтс (Yosh Wuyts), член рабочей группы по асинхронности Rust (Rust Async WG). Подробнее о разработке и идеях, стоящих за futures-concurrency, можно прочитать здесь:
| Ссылка | Краткое описание темы |
|---|---|
| Tree Structured Concurrency - What is structured concurrency | Введение в структурированную конкурентность и её основные концепции |
| Tree Structured Concurrency - Unstructured concurrency example | Пример неструктурированной конкурентности и её проблемы |
| Tree Structured Concurrency - Structured concurrency example | Пример структурированной конкурентности и её преимущества |
| Tree Structured Concurrency - What's the worst that can happen | Потенциальные проблемы неструктурированной конкурентности |
| Tree Structured Concurrency - Applying to your programs | Практическое применение структурированной конкурентности |
| Tree Structured Concurrency - Managed background tasks | Паттерн управления фоновыми задачами |
| Tree Structured Concurrency - Guaranteeing structure | Гарантии структурированной конкурентности |
| Tree Structured Concurrency - Conclusion | Выводы и заключение по структурированной конкурентности |
| Tree Structured Concurrency - Dijkstra | Ссылки на работы Дейкстры о структурированном программировании |
| Why Async Rust | Обоснование использования асинхронного Rust |
| Async Cancellation | Отмена асинхронных операций в Rust |
| EWD Notes on Structured Programming | Классические заметки Дейкстры о структурированном программировании |
| Graphs and Trees | Графы и деревья в компьютерных науках |
| Tree Structured Concurrency - Academia | Академические источники по структурированной конкурентности |
| Tree Structured Concurrency - Recursion | Рекурсия в контексте структурированной конкурентности |
| futures-concurrency docs | Документация крейта futures-concurrency |
| Tree Structured Concurrency - SQL | Аналогии с SQL в структурированной конкурентности |
| smol::Task docs | Документация по Task в крейте smol |
| smol::Task::cancel | Метод отмены задач в smol |
| Tree Structured Concurrency - Flush | Операция flush в контексте конкурентности |
| async_std::BufWriter docs | Буферизованный писатель в async-std |
| tasky implementation | Пример реализации структурированных задач |
| Scoped Tasks | Задачи с ограниченной областью видимости |
| Linearity and Control | Линейность и управление в программировании |
| async-task-group docs | Документация крейта для группировки асинхронных задач |
| Tree Structured Concurrency - State Machine | Структурированные конечные автоматы |
| Google's Rust Journey | Опыт Google по внедрению Rust в 2022 году |
Модули
| Имя | Описание |
|---|---|
array | Вспомогательные функции и типы для массивов фиксированной длины. |
concurrent_stream | Конкурентное выполнение потоков. |
future | Базовая асинхронная функциональность. |
prelude | Предел конкурентности future. |
stream | Компонуемая асинхронная итерация. |
vec | Типы параллельных итераторов для векторов (Vec<T>). |
Модули
Модуль array
Вспомогательные функции и типы для массивов фиксированной длины.
Структуры
| Имя | Описание |
|---|---|
AggregateError | Коллекция ошибок. |
Chain | Поток, который объединяет несколько потоков один за другим. |
Join | Future, который ожидает завершения двух future схожего типа. |
Merge | Поток, который объединяет несколько потоков в один поток. |
Race | Future, который ожидает завершения первого future. |
RaceOk | Future, который ожидает завершения первого успешного future. |
TryJoin | Future, который ожидает успешного завершения всех future или досрочно прерывается при ошибке. |
Zip | Поток, который "объединяет" несколько потоков в один поток пар. |
Примеры использования
Использование Join для ожидания нескольких future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::future; let a = future::ready(1); let b = future::ready(2); let c = future::ready(3); let result = [a, b, c].join().await; assert_eq!(result, [1, 2, 3]); }
Использование TryJoin для обработки ошибок
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let a = async { Ok::<i32, &str>(1) }; let b = async { Ok::<i32, &str>(2) }; let c = async { Err::<i32, &str>("error") }; let result = [a, b, c].try_join().await; assert!(result.is_err()); }
Использование Race для гонки future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let fast = async { 1 }; let slow = async { sleep(Duration::from_secs(1)).await; 2 }; let result = [fast, slow].race().await; assert_eq!(result, 1); }
Использование Merge для объединения потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let stream1 = stream::iter(vec![1, 2]); let stream2 = stream::iter(vec![3, 4]); let mut merged = [stream1, stream2].merge(); let mut results = Vec::new(); while let Some(value) = merged.next().await { results.push(value); } results.sort(); assert_eq!(results, vec![1, 2, 3, 4]); }
Использование Zip для параллельной обработки потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let stream1 = stream::iter(vec![1, 2, 3]); let stream2 = stream::iter(vec![4, 5, 6]); let mut zipped = [stream1, stream2].zip(); let mut results = Vec::new(); while let Some(values) = zipped.next().await { results.push(values); } assert_eq!(results, vec![[1, 4], [2, 5], [3, 6]]); }
Особенности работы с массивами
Модуль array предоставляет специализированные реализации для массивов фиксированной длины, что позволяет эффективно работать с известным на этапе компиляции количеством элементов. Это обеспечивает лучшую производительность и безопасность типов по сравнению с динамическими коллекциями.
Модуль concurrent_stream
Конкурентное выполнение потоков.
Примеры
Конкурентная обработка элементов в коллекции
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let v: Vec<_> = vec!["chashu", "nori"] .into_co_stream() .map(|msg| async move { format!("hello {msg}") }) .collect() .await; assert_eq!(v, &["hello chashu", "hello nori"]); }
Конкурентная обработка элементов в потоке
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_lite::stream; let v: Vec<_> = stream::repeat("chashu") .co() .take(2) .map(|msg| async move { format!("hello {msg}") }) .collect() .await; assert_eq!(v, &["hello chashu", "hello chashu"]); }
Дополнительные примеры использования
Ограничение количества одновременных операций
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio::time::{sleep, Duration}; let items = vec![1, 2, 3, 4, 5]; let results: Vec<_> = items .into_co_stream() .limit(2) // Ограничиваем до 2 одновременных операций .map(|n| async move { sleep(Duration::from_millis(100)).await; n * 2 }) .collect() .await; assert_eq!(results, vec![2, 4, 6, 8, 10]); }
Использование enumerate для получения индексов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let items = vec!["a", "b", "c"]; let results: Vec<_> = items .into_co_stream() .enumerate() .map(|(index, item)| async move { format!("{index}: {item}") }) .collect() .await; assert_eq!(results, vec!["0: a", "1: b", "2: c"]); }
Селективное взятие элементов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let items = vec![1, 2, 3, 4, 5]; let results: Vec<_> = items .into_co_stream() .take(3) // Берем только первые 3 элемента .map(|n| async move { n * 2 }) .collect() .await; assert_eq!(results, vec![2, 4, 6]); }
Структуры
| Имя | Описание |
|---|---|
Enumerate | Конкурентный итератор, который выдает текущий счетчик и элемент во время итерации. |
FromStream | Конкурентная реализация for each из Stream. |
Limit | Конкурентный итератор, который ограничивает количество применяемого параллелизма. |
Map | Преобразует элементы из одного типа в другой. |
Take | Конкурентный итератор, который итерируется только по первым n итерациям. |
Перечисления
| Имя | Описание |
|---|---|
ConsumerState | Состояние потребителя, используется для обратной связи с источником. |
Трейты
| Имя | Описание |
|---|---|
ConcurrentStream | Конкурентная работа с элементами в потоке. |
Consumer | Описывает тип, который может получать данные. |
FromConcurrentStream | Преобразование из ConcurrentStream. |
IntoConcurrentStream | Преобразование в ConcurrentStream. |
Практическое применение
Обработка HTTP запросов с ограничением параллелизма
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use reqwest::Client; async fn fetch_url(url: &str) -> Result<String, reqwest::Error> { let client = Client::new(); let response = client.get(url).send().await?; response.text().await } let urls = vec![ "https://httpbin.org/get", "https://httpbin.org/ip", "https://httpbin.org/user-agent", ]; let results: Vec<Result<String, _>> = urls .into_co_stream() .limit(2) // Максимум 2 одновременных запроса .map(|url| async move { fetch_url(url).await }) .collect() .await; }
Параллельная обработка файлов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio::fs; async fn process_file(path: &str) -> Result<usize, std::io::Error> { let content = fs::read_to_string(path).await?; Ok(content.len()) } let files = vec!["file1.txt", "file2.txt", "file3.txt"]; let sizes: Vec<Result<usize, _>> = files .into_co_stream() .map(|file| async move { process_file(file).await }) .collect() .await; }
Преимущества конкурентных потоков
- Эффективное использование ресурсов: Операции выполняются конкурентно, а не последовательно
- Контроль параллелизма: Возможность ограничивать количество одновременных операций
- Композиционность: Легко комбинировать с другими операциями потоков
- Безопасность типов: Статическая проверка типов на этапе компиляции
Модуль future
Базовая асинхронная функциональность.
Пожалуйста, ознакомьтесь с ключевыми словами async и await и книгой по асинхронному программированию для получения дополнительной информации об асинхронном программировании в Rust.
Примеры
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_lite::future::block_on; use std::future; block_on(async { // Ожидание нескольких future одинакового типа. let a = future::ready(1); let b = future::ready(2); let c = future::ready(3); assert_eq!([a, b, c].join().await, [1, 2, 3]); // Ожидание нескольких future разных типов. let a = future::ready(1u8); let b = future::ready("hello"); let c = future::ready(3u16); assert_eq!((a, b, c).join().await, (1, "hello", 3)); // Это также работает с векторами future, предоставляя альтернативу // `join_all` из futures-rs. let a = future::ready(1); let b = future::ready(2); let c = future::ready(3); assert_eq!(vec![a, b, c].join().await, vec![1, 2, 3]); }) }
Конкурентность
Часто операции зависят от результата нескольких future. Вместо последовательного ожидания каждого future может быть более эффективным ожидать их конкурентно. Rust предоставляет встроенные механизмы в библиотеке, чтобы сделать это простым и удобным.
Безошибочная конкурентность
При работе с future, которые не возвращают типы Result, мы предоставляем две встроенные операции конкурентности:
future::Merge: ожидать завершения всех future в набореfuture::Race: ожидать завершения первого future в наборе
Поскольку future можно рассматривать как асинхронную последовательность из одного элемента, см. раздел конкурентности асинхронных итераторов для дополнительных операций асинхронной конкурентности.
Ошибочная конкурентность
При работе с future, которые возвращают типы Result, значение существующих операций меняется, и становятся доступными дополнительные операции конкурентности, учитывающие Result:
| Ожидание всех результатов | Ожидание первого результата | |
|---|---|---|
| Продолжать при ошибке | future::Merge | future::RaceOk |
| Прерывать при ошибке | future::TryMerge | future::Race |
future::TryMerge: ожидать успешного завершения всех future в наборе или вернуться при первой ошибке.future::RaceOk: ожидать завершения первого успешного future в наборе или вернутьErr, если ни один future не завершился успешно.
Дополнительные примеры
Использование TryMerge с ошибочными future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let success = async { Ok::<i32, &str>(42) }; let error = async { Err::<i32, &str>("something went wrong") }; let another_success = async { Ok::<i32, &str>(100) }; let result = [success, error, another_success].try_join().await; assert!(result.is_err()); // Прерывается на первой ошибке }
Использование RaceOk для получения первого успешного результата
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let fast_error = async { sleep(Duration::from_millis(10)).await; Err::<i32, &str>("fast error") }; let slow_success = async { sleep(Duration::from_millis(50)).await; Ok::<i32, &str>(42) }; let medium_success = async { sleep(Duration::from_millis(30)).await; Ok::<i32, &str>(100) }; let result = [fast_error, slow_success, medium_success].race_ok().await; assert_eq!(result, Ok(100)); // Первый успешный результат }
Использование FutureGroup для динамического управления future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_concurrency::future::FutureGroup; let mut group = FutureGroup::new(); // Добавляем future в группу group.push(async { 1 }); group.push(async { 2 }); group.push(async { 3 }); // Ожидаем завершения всех future в группе let results = group.join().await; assert_eq!(results, vec![1, 2, 3]); }
Практический пример с сетевыми запросами
use futures_concurrency::prelude::*; use reqwest::Client; async fn fetch_data(client: &Client, url: &str) -> Result<String, reqwest::Error> { let response = client.get(url).send().await?; response.text().await } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::new(); let urls = vec![ "https://httpbin.org/ip", "https://httpbin.org/user-agent", "https://httpbin.org/headers" ]; // Параллельное выполнение всех запросов let futures: Vec<_> = urls.iter() .map(|url| fetch_data(&client, url)) .collect(); let results = futures.try_join().await?; for (i, result) in results.iter().enumerate() { println!("Ответ {}: {} символов", i, result.len()); } Ok(()) }
Модули
| Имя | Описание |
|---|---|
future_group | Расширяемая группа future, которые действуют как единое целое. |
Структуры
| Имя | Описание |
|---|---|
FutureGroup | Расширяемая группа future, которые действуют как единое целое. |
WaitUntil | Приостанавливает future до указанного крайнего срока. |
Трейты
| Имя | Описание |
|---|---|
FutureExt | Трейт-расширение для трейта Future. |
Join | Ожидание завершения всех future. |
Race | Ожидание завершения первого future. |
RaceOk | Ожидание завершения первого успешного future. |
TryJoin | Ожидание успешного завершения всех future или досрочное прерывание при ошибке. |
Ключевые преимущества
- Типобезопасность: Статическая проверка типов на этапе компиляции
- Гибкость: Работа с future разных типов и структур данных
- Эффективность: Конкурентное выполнение вместо последовательного
- Композиционность: Легкое комбинирование операций
- Обработка ошибок: Различные стратегии для работы с ошибочными сценариями
Модуль prelude
Предел (prelude) конкурентности futures.
Реэкспорт
#![allow(unused)] fn main() { pub use super::future::FutureExt as _; pub use super::stream::StreamExt as _; pub use super::future::Join as _; pub use super::future::Race as _; pub use super::future::RaceOk as _; pub use super::future::TryJoin as _; pub use super::stream::Chain as _; pub use super::stream::IntoStream as _; pub use super::stream::Merge as _; pub use super::stream::Zip as _; pub use super::concurrent_stream::ConcurrentStream; pub use super::concurrent_stream::FromConcurrentStream; pub use super::concurrent_stream::IntoConcurrentStream; }
Описание
Модуль prelude предоставляет удобный способ импортировать все основные трейты и типы futures-concurrency одним оператором use. Это стандартная практика в экосистеме Rust для упрощения импорта часто используемых компонентов.
Использование
Добавьте следующую строку в начало вашего файла для импорта всех основных компонентов futures-concurrency:
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; }
Что включается в предел
Трейты для Future
FutureExt- методы расширения дляFutureJoin- ожидание завершения всех futureRace- ожидание завершения первого futureRaceOk- ожидание завершения первого успешного futureTryJoin- ожидание успешного завершения всех future или досрочное прерывание при ошибке
Трейты для Stream
StreamExt- методы расширения дляStreamChain- объединение потоковIntoStream- преобразование в потокMerge- слияние потоковZip- объединение потоков в пары
Трейты для конкурентных потоков
ConcurrentStream- конкурентная работа с элементами в потокеFromConcurrentStream- преобразование из конкурентного потокаIntoConcurrentStream- преобразование в конкурентный поток
Примеры использования
Базовое использование
use futures_concurrency::prelude::*; use std::future; #[tokio::main] async fn main() { // Использование методов из FutureExt let a = future::ready(1); let b = future::ready(2); // Метод join() доступен благодаря импорту трейта Join let result = (a, b).join().await; assert_eq!(result, (1, 2)); }
Работа с потоками
use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; #[tokio::main] async fn main() { let stream1 = stream::iter(vec![1, 2]); let stream2 = stream::iter(vec![3, 4]); // Метод merge() доступен благодаря импорту трейта Merge let merged = (stream1, stream2).merge(); let results: Vec<_> = merged.collect().await; // Порядок элементов может варьироваться из-за природы merge assert!(results.contains(&1) && results.contains(&4)); }
Конкурентные потоки
use futures_concurrency::prelude::*; #[tokio::main] async fn main() { let items = vec!["hello", "world"]; // Преобразование в конкурентный поток let results: Vec<String> = items .into_co_stream() // Доступно благодаря IntoConcurrentStream .map(|s| async move { s.to_uppercase() }) .collect() // Доступно благодаря ConcurrentStream .await; assert_eq!(results, vec!["HELLO", "WORLD"]); }
Преимущества использования prelude
- Удобство: Один импорт вместо множества отдельных
- Согласованность: Стандартизированный способ импорта
- Полнота: Все основные компоненты доступны сразу
- Обновляемость: Новые функции автоматически включаются в предел
Примечание
Импорт с as _ означает, что трейты импортируются только для их побочных эффектов (добавления методов к существующим типам), но сами имена трейтов не добавляются в область видимости. Это предотвращает конфликты имен.
Например, после use futures_concurrency::prelude::*; вы можете использовать методы типа join() на кортежах future, но не можете напрямую ссылаться на тип Join.
Модуль stream
Компонуемая асинхронная итерация.
Примеры
Объединение нескольких потоков для обработки значений сразу по готовности
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_lite::stream::{self, StreamExt}; use futures_lite::future::block_on; block_on(async { let a = stream::once(1); let b = stream::once(2); let c = stream::once(3); let s = (a, b, c).merge(); let mut counter = 0; s.for_each(|n| counter += n).await; assert_eq!(counter, 6); }) }
Конкурентность
При работе с несколькими (асинхронными) итераторами порядок, в котором итераторы ожидаются, важен. Как часть асинхронных итераторов, Rust предоставляет встроенные операции для управления порядком выполнения наборов итераторов:
- merge: объединяет несколько итераторов в один итератор, где новый итератор выдает элемент, как только он становится доступен из одного из базовых итераторов.
- zip: объединяет несколько итераторов в итератор пар. Базовые итераторы будут ожидаться конкурентно.
- chain: итерируется по нескольким итераторам последовательно. Следующий итератор в последовательности не начнется, пока предыдущий итератор не завершится.
Future
Future можно рассматривать как асинхронные последовательности из одного элемента. Используя stream::once, future можно преобразовать в асинхронные итераторы и затем использовать с любыми методами конкурентности итераторов. Это позволяет использовать такие операции, как stream::Merge, для конкурентного выполнения наборов future, но получать выходные данные отдельных future, как только они становятся доступны.
См. документацию по конкурентности future для получения дополнительной информации о конкурентности future.
Дополнительные примеры
Использование zip для параллельной обработки потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let names = stream::iter(vec!["Alice", "Bob", "Charlie"]); let ages = stream::iter(vec![25, 30, 35]); let zipped = (names, ages).zip(); let mut results = Vec::new(); while let Some((name, age)) = zipped.next().await { results.push(format!("{} is {} years old", name, age)); } assert_eq!(results, vec![ "Alice is 25 years old", "Bob is 30 years old", "Charlie is 35 years old" ]); }
Использование chain для последовательного соединения потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let first = stream::iter(vec![1, 2]); let second = stream::iter(vec![3, 4]); let third = stream::iter(vec![5, 6]); let chained = (first, second, third).chain(); let results: Vec<_> = chained.collect().await; assert_eq!(results, vec![1, 2, 3, 4, 5, 6]); }
Использование StreamGroup для динамического управления потоками
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_concurrency::stream::StreamGroup; use tokio_stream::{self as stream, StreamExt}; let mut group = StreamGroup::new(); // Добавляем потоки в группу group.push(stream::iter(vec![1, 2])); group.push(stream::iter(vec![3, 4])); // Объединяем все потоки в группе let mut merged = group.merge(); let mut results = Vec::new(); while let Some(value) = merged.next().await { results.push(value); } results.sort(); assert_eq!(results, vec![1, 2, 3, 4]); }
Практический пример с обработкой событий
use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; use tokio::time::{interval, Duration}; use std::pin::Pin; #[tokio::main] async fn main() { // Создаем несколько потоков событий let user_events = stream::iter(vec!["login", "purchase", "logout"]); let system_events = stream::iter(vec!["startup", "shutdown"]); let timer_events = { let mut interval = interval(Duration::from_secs(1)); stream::unfold((), move |_| { let interval = Pin::new(&mut interval); async move { interval.tick().await; Some(("tick", ())) } }) }; // Объединяем все потоки событий let all_events = (user_events, system_events, timer_events.take(2)).merge(); let mut event_count = 0; all_events .for_each(|event| { event_count += 1; println!("Обработано событие: {:?}", event); async {} }) .await; println!("Всего обработано событий: {}", event_count); }
Модули
| Имя | Описание |
|---|---|
stream_group | Расширяемая группа потоков, которые действуют как единое целое. |
Структуры
| Имя | Описание |
|---|---|
StreamGroup | Расширяемая группа потоков, которые действуют как единое целое. |
WaitUntil | Задерживает выполнение потока один раз на указанную длительность. |
Трейты
| Имя | Описание |
|---|---|
Chain | Берет несколько потоков и создает новый поток для всех последовательно. |
IntoStream | Преобразование в Stream. |
Merge | Объединяет несколько потоков в один поток всех их выходных данных. |
StreamExt | Трейт-расширение для трейта Stream. |
Zip | "Объединяет" несколько потоков в один поток пар. |
Ключевые особенности
- Композиционность: Легко комбинировать различные операции с потоками
- Конкурентность: Эффективное управление несколькими потоками данных
- Безопасность типов: Статическая проверка на этапе компиляции
- Гибкость: Поддержка различных структур данных и паттернов использования
- Эффективность: Минимизация накладных расходов при работе с потоками
Модуль vec
Типы параллельных итераторов для векторов (Vec<T>)
Вам редко понадобится взаимодействовать с этим модулем напрямую, если только вам не нужно указать имя одного из типов итераторов.
Структуры
| Имя | Описание |
|---|---|
AggregateError | Коллекция ошибок. |
Chain | Поток, который объединяет несколько потоков один за другим. |
IntoConcurrentStream | Конкурентный асинхронный итератор, который перемещается из вектора. |
Join | Future, который ожидает завершения нескольких future. |
Merge | Поток, который объединяет несколько потоков в один поток. |
Race | Future, который ожидает завершения первого future. |
RaceOk | Future, который ожидает завершения первого успешного future. |
TryJoin | Future, который ожидает успешного завершения всех future или досрочно прерывается при ошибке. |
Zip | Поток, который "объединяет" несколько потоков в один поток пар. |
Примеры использования
Базовое использование с векторами
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let futures = vec![ async { 1 }, async { 2 }, async { 3 }, ]; let results = futures.join().await; assert_eq!(results, vec![1, 2, 3]); }
Конкурентная обработка элементов вектора
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio::time::{sleep, Duration}; let items = vec!["hello", "world", "rust"]; let results: Vec<String> = items .into_co_stream() // Преобразование вектора в конкурентный поток .map(|s| async move { sleep(Duration::from_millis(10)).await; s.to_uppercase() }) .collect() .await; assert_eq!(results, vec!["HELLO", "WORLD", "RUST"]); }
Обработка ошибок с векторами future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let futures = vec![ async { Ok::<i32, &str>(1) }, async { Err::<i32, &str>("error occurred") }, async { Ok::<i32, &str>(3) }, ]; let result = futures.try_join().await; assert!(result.is_err()); }
Гонка future в векторе
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let futures = vec![ async { sleep(Duration::from_millis(100)).await; 1 }, async { sleep(Duration::from_millis(50)).await; 2 }, async { sleep(Duration::from_millis(10)).await; 3 }, ]; let winner = futures.race().await; assert_eq!(winner, 3); // Самый быстрый future }
Объединение потоков из векторов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let streams = vec![ stream::iter(vec![1, 2]), stream::iter(vec![3, 4]), stream::iter(vec![5, 6]), ]; let mut merged = streams.merge(); let mut results = Vec::new(); while let Some(value) = merged.next().await { results.push(value); } results.sort(); assert_eq!(results, vec![1, 2, 3, 4, 5, 6]); }
Практический пример: параллельные HTTP запросы
use futures_concurrency::prelude::*; use reqwest::Client; async fn fetch_url(client: &Client, url: &str) -> Result<String, reqwest::Error> { let response = client.get(url).send().await?; response.text().await } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::new(); let urls = vec![ "https://httpbin.org/ip", "https://httpbin.org/user-agent", "https://httpbin.org/headers", ]; // Создаем вектор future let fetch_futures: Vec<_> = urls .into_iter() .map(|url| fetch_url(&client, url)) .collect(); // Параллельное выполнение всех запросов let results = fetch_futures.try_join().await?; for (i, content) in results.iter().enumerate() { println!("Запрос {}: {} байт", i, content.len()); } Ok(()) }
Использование RaceOk с вектором ошибочных операций
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let operations = vec![ async { sleep(Duration::from_millis(50)).await; Err::<i32, &str>("slow error") }, async { sleep(Duration::from_millis(10)).await; Ok::<i32, &str>(42) }, async { sleep(Duration::from_millis(30)).await; Ok::<i32, &str>(100) }, ]; let result = operations.race_ok().await; assert_eq!(result, Ok(42)); // Первый успешный результат }
Особенности работы с векторами
Модуль vec предоставляет специализированные реализации для типа Vec<T>, что обеспечивает:
- Эффективность: Оптимизированные реализации для динамических коллекций
- Гибкость: Работа с переменным количеством элементов
- Удобство: Простое преобразование существующих векторов
- Интеграция: Легкое взаимодействие с другими частями стандартной библиотеки
Примечание по использованию
Хотя вы можете напрямую использовать типы из этого модуля, в большинстве случаев достаточно импорта futures_concurrency::prelude::* и использования методов напрямую на векторах:
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; // Вместо прямого использования vec::Join: // let results = vec::Join::new(futures).await; // Просто используйте метод на векторе: let results = futures.join().await; }
Крейт futures_lite
Источник: Документация futures_lite
Фьючерсы, потоки и комбинаторы для асинхронного ввода-вывода.
Описание
Этот крейт представляет собой подмножество futures, который компилируется на порядок быстрее, исправляет мелкие недостатки в его API, заполняет некоторые очевидные пробелы и удаляет почти весь небезопасный код из него.
Короче говоря, этот крейт стремится быть более удобным, чем futures, но при этом полностью совместимым с ним.
API этого крейта намеренно ограничен. Пожалуйста, ознакомьтесь со списком функций для API, которые исключены из этого крейта.
Примеры
use futures_lite::future; fn main() { future::block_on(async { println!("Hello world!"); }) }
Реэкспорты
| Трейт | Модуль | Описание |
|---|---|---|
AsyncBufRead | io | Асинхронное чтение с буферизацией |
AsyncBufReadExt | io | Расширения для AsyncBufRead |
AsyncRead | io | Асинхронное чтение |
AsyncReadExt | io | Расширения для AsyncRead |
AsyncSeek | io | Асинхронный поиск |
AsyncSeekExt | io | Расширения для AsyncSeek |
AsyncWrite | io | Асинхронная запись |
AsyncWriteExt | io | Расширения для AsyncWrite |
Future | future | Базовый трейт для асинхронных вычислений |
FutureExt | future | Расширения для Future |
Stream | stream | Поток значений |
StreamExt | stream | Расширения для Stream |
Модули
| Модуль | Описание |
|---|---|
future | Комбинаторы для трейта Future |
io | Инструменты и комбинаторы для ввода-вывода |
prelude | Трейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения |
stream | Комбинаторы для трейта Stream |
Макросы
| Макрос | Описание |
|---|---|
pin | Закрепляет переменную типа T в стеке и перепривязывает её как Pin<&mut T> |
ready | Разворачивает Poll<T> или возвращает Pending |
Преимущества перед futures
Скорость компиляции
- На порядок быстрее: Значительно ускоренная компиляция
- Минимальные зависимости: Меньше зависимостей для компиляции
Улучшения API
- Исправленные недостатки: Устранены мелкие проблемы оригинального API
- Заполненные пробелы: Добавлены отсутствующие, но полезные функции
- Более интуитивный дизайн: Улучшенный пользовательский опыт
Безопасность
- Минимум unsafe кода: Почти полное отсутствие небезопасного кода
- Надежность: Повышенная безопасность выполнения
Совместимость
Полная совместимость
- Интероперабельность: Полная совместимость с крейтом
futures - Миграция: Легкий переход с
futuresнаfutures_lite
Ограниченный API
- Целевой подход: Только наиболее часто используемые функции
- Чистота дизайна: Отсутствие перегруженности функциями
Типичное использование
Базовый асинхронный код
#![allow(unused)] fn main() { use futures_lite::future; use futures_lite::stream::StreamExt; future::block_on(async { // Асинхронные операции }); }
Работа с I/O
#![allow(unused)] fn main() { use futures_lite::io::{AsyncReadExt, AsyncWriteExt}; use futures_lite::future; // Асинхронные операции чтения/записи }
Потоки данных
#![allow(unused)] fn main() { use futures_lite::stream::{self, StreamExt}; let mut stream = stream::iter(vec![1, 2, 3]); while let Some(item) = stream.next().await { println!("{}", item); } }
Рекомендации по использованию
Когда использовать futures_lite
- Новые проекты: Для быстрого старта
- Производительность компиляции: Когда важна скорость сборки
- Простота: Для проектов, не требующих полного функционала
futures
Когда использовать futures
- Специфичные функции: При необходимости функций, отсутствующих в
futures_lite - Существующие проекты: При миграции существующего кода
- Расширенные сценарии: Для сложных случаев использования
Крейт async_io
Источник: Документация async_io
Асинхронный ввод-вывод и таймеры.
Описание
Этот крейт предоставляет два основных инструмента:
Async- адаптер для стандартных сетевых типов (и многих других типов) для использования в асинхронных программахTimer- фьючерс или поток, который эмитирует события по времени
Для конкретных асинхронных сетевых типов, построенных на основе этого крейта, см. async-net.
Реализация
При первом использовании Async или Timer создается поток с именем "async-io". Цель этого потока - ожидать события ввода-вывода, о которых сообщает операционная система, а затем пробуждать соответствующие фьючерсы, заблокированные на операциях ввода-вывода или таймерах, когда их можно возобновить.
Для ожидания следующего события ввода-вывода поток "async-io" использует:
- epoll на Linux/Android/illumos
- kqueue на macOS/iOS/BSD
- event ports на illumos/Solaris
- IOCP на Windows
Эта функциональность предоставляется крейтом polling.
Однако обратите внимание, что вы также можете обрабатывать события ввода-вывода и пробуждать фьючерсы на любом потоке с помощью функции block_on(). Таким образом, поток "async-io" является лишь резервным механизмом обработки событий ввода-вывода на случай, если других потоков нет.
Примеры
Подключение к example.com:80 с таймаутом 10 секунд:
#![allow(unused)] fn main() { use async_io::{Async, Timer}; use futures_lite::{future::FutureExt, io}; use std::net::{TcpStream, ToSocketAddrs}; use std::time::Duration; let addr = "example.com:80".to_socket_addrs()?.next().unwrap(); let stream = Async::<TcpStream>::connect(addr).or(async { Timer::after(Duration::from_secs(10)).await; Err(io::ErrorKind::TimedOut.into()) }) .await?; }
Модули
| Модуль | Описание |
|---|---|
os | Платформо-специфичная функциональность |
Структуры
| Структура | Описание |
|---|---|
Async | Асинхронный адаптер для I/O типов |
Readable | Фьючерс для Async::readable |
ReadableOwned | Фьючерс для Async::readable_owned |
Timer | Фьючерс или поток, который эмитирует события по времени |
Writable | Фьючерс для Async::writable |
WritableOwned | Фьючерс для Async::writable_owned |
Трейты
| Трейт | Описание |
|---|---|
IoSafe | Типы, реализации I/O трейтов которых не удаляют базовый I/O источник |
Функции
| Функция | Описание |
|---|---|
block_on | Блокирует текущий поток на фьючерсе, обрабатывая I/O события в простое |
Ключевые особенности
Архитектура
- Фоновый поток: "async-io" поток для обработки событий
- Кросс-платформенность: Единый API для разных операционных систем
- Резервный механизм: Основная обработка может происходить в любом потоке
Использование Async адаптера
#![allow(unused)] fn main() { use async_io::Async; use std::net::TcpStream; // Создание асинхронного TCP-соединения let stream = Async::<TcpStream>::connect("example.com:80").await?; }
Использование таймеров
#![allow(unused)] fn main() { use async_io::Timer; use std::time::Duration; // Ожидание 1 секунды Timer::after(Duration::from_secs(1)).await; // Периодический таймер let mut interval = Timer::interval(Duration::from_secs(1)); while interval.next().await.is_some() { println!("Прошла 1 секунда"); } }
Зависимости
- polling: Для кросс-платформенного опроса событий ввода-вывода
- futures-lite: Для асинхронных примитивов и комбинаторов
- concurrent-queue: Для внутренней синхронизации
Производительность
- Эффективное пробуждение: Точечное пробуждение только нужных фьючерсов
- Минимальные накладные расходы: Легковесная реализация
- Масштабируемость: Поддержка большого количества одновременных операций
Крейт async_compat
Источник: Документация async_compat
Адаптер совместимости между tokio и futures.
Описание
Существует два вида проблем совместимости между tokio и futures:
-
Типы Tokio не могут использоваться вне контекста tokio, поэтому любая попытка их использования приведет к панике.
Решение: Если вы примените адаптер
Compatк фьючерсу, фьючерс вручную войдет в контекст глобального runtime tokio. Если runtime уже доступен через thread-local переменные tokio, он будет использован. В противном случае, новый однопоточный runtime будет создан по требованию. Это не означает, что фьючерс опрашивается runtime tokio - это означает, что фьючерс устанавливает thread-local переменную, указывающую на глобальный runtime tokio, чтобы типы tokio могли использоваться внутри него. -
Tokio и futures имеют похожие, но разные I/O трейты
AsyncRead,AsyncWrite,AsyncBufReadиAsyncSeek.Решение: Когда адаптер
Compatприменяется к I/O типу, он будет реализовывать трейты противоположного вида. Таким образом вы можете использовать типы на основе tokio там, где ожидаются типы на основе futures, и наоборот.
Вы можете применить адаптер Compat с помощью конструктора Compat::new() или используя любой метод из трейта CompatExt.
Примеры
Эта программа читает строки из stdin и выводит их в stdout, но она не будет работать:
fn main() -> std::io::Result<()> { futures::executor::block_on(async { let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); // Следующая строка не будет работать по двум причинам: // 1. Ошибка времени выполнения, потому что stdin и stdout используются вне контекста tokio. // 2. Ошибка компиляции из-за несовпадения трейтов `AsyncRead` и `AsyncWrite`. futures::io::copy(stdin, &mut stdout).await?; Ok(()) }) }
Чтобы обойти проблемы совместимости, примените адаптер Compat к stdin, stdout и futures::io::copy():
use async_compat::CompatExt; fn main() -> std::io::Result<()> { futures::executor::block_on(async { let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).compat().await?; Ok(()) }) }
Также возможно применить Compat к внешнему фьючерсу, передаваемому в futures::executor::block_on(), а не к самому futures::io::copy(). Когда адаптер применяется к внешнему фьючерсу, отдельные внутренние фьючерсы не нуждаются в адаптере, потому что они теперь все внутри контекста tokio:
use async_compat::{Compat, CompatExt}; fn main() -> std::io::Result<()> { futures::executor::block_on(Compat::new(async { let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).await?; Ok(()) })) }
Адаптер совместимости преобразует между I/O типами на основе tokio и futures в любом направлении. Вот как мы можем написать ту же программу, используя I/O типы на основе futures внутри tokio:
use async_compat::CompatExt; use blocking::Unblock; #[tokio::main] async fn main() -> std::io::Result<()> { let mut stdin = Unblock::new(std::io::stdin()); let mut stdout = Unblock::new(std::io::stdout()); tokio::io::copy(&mut stdin.compat_mut(), &mut stdout.compat_mut()).await?; Ok(()) }
Наконец, мы можем использовать любой крейт на основе tokio из любого другого асинхронного runtime. Вот пример с reqwest и warp:
use async_compat::{Compat, CompatExt}; use warp::Filter; fn main() { futures::executor::block_on(Compat::new(async { // Выполняем HTTP GET запрос. let response = reqwest::get("https://www.rust-lang.org").await.unwrap(); println!("{}", response.text().await.unwrap()); // Запускаем HTTP сервер. let routes = warp::any().map(|| "Hello from warp!"); warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; })) }
Структуры
| Структура | Описание |
|---|---|
Compat | Адаптер совместимости для фьючерсов и I/O типов |
Трейты
| Трейт | Описание |
|---|---|
CompatExt | Применяет адаптер Compat к фьючерсам и I/O типам |
Ключевые особенности
Решение проблем контекста
- Автоматическое создание runtime: Создает runtime tokio при необходимости
- Thread-local контекст: Устанавливает контекст для использования типов tokio
- Прозрачность: Минимальные изменения в существующем коде
Решение проблем I/O трейтов
- Двунаправленное преобразование: Совместимость в обоих направлениях
- Трейты futures ↔ tokio: Автоматическая реализация соответствующих трейтов
- Универсальность: Работает с любыми I/O типами
Методы трейта CompatExt
| Метод | Назначение |
|---|---|
compat() | Применяет адаптер к значению |
compat_mut() | Применяет адаптер к изменяемой ссылке |
compat_ref() | Применяет адаптер к неизменяемой ссылке |
Сценарии использования
Использование tokio кода с futures исполнителем
#![allow(unused)] fn main() { use async_compat::CompatExt; futures::executor::block_on(async { let client = reqwest::Client::new(); let response = client.get("http://example.com").send().compat().await?; // ... }); }
Смешивание I/O типов
#![allow(unused)] fn main() { use async_compat::CompatExt; // Использование tokio TcpStream с futures кодом let stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await?; futures::io::copy(&mut stream.compat(), &mut output).await?; }
Интеграция библиотек
#![allow(unused)] fn main() { use async_compat::{Compat, CompatExt}; // Использование warp (tokio) с futures исполнителем futures::executor::block_on(Compat::new(async { warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; })); }
Крейт smol
Источник: Официальная документация smol
Маленький и быстрый асинхронный рантайм.
Этот крейт просто реэкспортирует другие небольшие асинхронные крейты (см. исходный код).
Использование с библиотеками на базе Tokio
Чтобы использовать библиотеки на базе Tokio со smol, примените адаптер async-compat к фьючерсам и типам I/O.
Упрощенная настройка
См. крейт smol-macros, если вам нужна настройка без proc-макросов, с быстрой компиляцией и простым в использовании асинхронным main и/или многопоточным исполнителем (Executor) "из коробки".
Дополнения к smol
В дополнение к проекту smol в качестве прямой замены, вам могут быть полезны и другие части экосистемы фьючерсов, включая futures-concurrency, async-io, futures-lite и async-compat.
Примеры
Подключение к HTTP-сайту, выполнение GET-запроса и вывод ответа в стандартный вывод:
use smol::{io, net, prelude::*, Unblock}; fn main() -> io::Result<()> { smol::block_on(async { let mut stream = net::TcpStream::connect("example.com:80").await?; let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"; stream.write_all(req).await?; let mut stdout = Unblock::new(std::io::stdout()); io::copy(stream, &mut stdout).await?; Ok(()) }) }
Больше примеров можно найти в директории examples.
Модули
| Модуль | Описание |
|---|---|
channel | Асинхронный многопоточный канал, где каждое сообщение может быть получено только одним из всех существующих потребителей |
fs | Примитивы для асинхронной работы с файловой системой |
future | Комбинаторы для трейта Future |
io | Инструменты и комбинаторы для I/O операций |
lock | Асинхронные примитивы синхронизации |
net | Асинхронные примитивы для TCP/UDP/Unix коммуникации |
prelude | Трейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения |
process | Асинхронный интерфейс для работы с процессами |
stream | Комбинаторы для трейта Stream |
Макросы
| Макрос | Описание |
|---|---|
pin | Фиксирует переменную типа T в стеке и перепривязывает её как Pin<&mut T> |
ready | Разворачивает Poll<T> или возвращает Pending |
Структуры
| Структура | Описание |
|---|---|
Async | Асинхронный адаптер для I/O типов |
Executor | Асинхронный исполнитель |
LocalExecutor | Поточно-локальный исполнитель |
Task | Запущенная задача |
Timer | Фьючерс или поток, который эмитирует события по времени |
Unblock | Выполняет блокирующий I/O в пуле потоков |
Функции
| Функция | Описание |
|---|---|
block_on | Блокирует текущий поток на фьючерсе, обрабатывая I/O события в простое |
spawn | Запускает задачу в глобальном исполнителе (по умолчанию однопоточном) |
unblock | Выполняет блокирующий код в пуле потоков |
Модули
Крейт channel
Источник: Документация async-channel
Асинхронный многопоточный канал, где каждое сообщение может быть получено только одним из всех существующих потребителей.
Типы каналов
Существует два вида каналов:
- Ограниченный канал (bounded) с ограниченной емкостью
- Неограниченный канал (unbounded) с неограниченной емкостью
Структура канала
Канал имеет сторону Отправителя (Sender) и сторону Получателя (Receiver). Обе стороны могут быть клонированы и использоваться из нескольких потоков.
Закрытие канала
Когда все отправители или все получатели уничтожаются, канал закрывается. Когда канал закрыт, нельзя отправлять новые сообщения, но оставшиеся сообщения все еще можно получить.
Канал также можно закрыть вручную, вызвав Sender::close() или Receiver::close().
Примеры
#![allow(unused)] fn main() { let (s, r) = async_channel::unbounded(); assert_eq!(s.send("Hello").await, Ok(())); assert_eq!(r.recv().await, Ok("Hello")); }
Структуры
| Структура | Описание |
|---|---|
Receiver | Сторона получения канала |
Recv | Фьючерс, возвращаемый Receiver::recv() |
RecvError | Ошибка, возвращаемая из Receiver::recv() |
Send | Фьючерс, возвращаемый Sender::send() |
SendError | Ошибка, возвращаемая из Sender::send() |
Sender | Сторона отправки канала |
WeakReceiver | Receiver, который не препятствует закрытию канала |
WeakSender | Sender, который не препятствует закрытию канала |
Перечисления
| Перечисление | Описание |
|---|---|
TryRecvError | Ошибка, возвращаемая из Receiver::try_recv() |
TrySendError | Ошибка, возвращаемая из Sender::try_send() |
Функции
Крейт fs
Источник: Документация async-fs
Асинхронные примитивы для работы с файловой системой.
Этот крейт представляет собой асинхронную версию std::fs.
Реализация
Этот крейт использует механизм блокировки (blocking) для выгрузки блокирующих I/O операций в пул потоков.
Примеры
Создание нового файла и запись в него байтов:
#![allow(unused)] fn main() { use async_fs::File; use futures_lite::io::AsyncWriteExt; let mut file = File::create("a.txt").await?; file.write_all(b"Hello, world!").await?; file.flush().await?; }
Модули
| Модуль | Описание |
|---|---|
unix | Unix-специфичные расширения |
Структуры
| Структура | Описание |
|---|---|
DirBuilder | Построитель для создания директорий с настраиваемыми опциями |
DirEntry | Элемент в директории |
File | Открытый файл в файловой системе |
FileType | Структура, представляющая тип файла с методами доступа для каждого типа файла. Возвращается методом Metadata::file_type |
Metadata | Метаданные о файле |
OpenOptions | Построитель для открытия файлов с настраиваемыми опциями |
Permissions | Представление различных разрешений файла |
ReadDir | Поток элементов в директории |
Функции
| Функция | Описание |
|---|---|
canonicalize | Возвращает каноническую форму пути |
copy | Копирует файл в новое место |
create_dir | Создает новую пустую директорию по указанному пути |
create_dir_all | Рекурсивно создает директорию и все ее родительские компоненты, если они отсутствуют |
hard_link | Создает жесткую ссылку в файловой системе |
metadata | Читает метаданные для пути |
read | Читает все содержимое файла как сырые байты |
read_dir | Возвращает поток элементов в директории |
read_link | Читает символическую ссылку и возвращает путь, на который она указывает |
read_to_string | Читает все содержимое файла как строку |
remove_dir | Удаляет пустую директорию |
remove_dir_all | Удаляет директорию и все ее содержимое |
remove_file | Удаляет файл |
rename | Переименовывает файл или директорию в новое место |
set_permissions | Изменяет разрешения файла или директории |
symlink_metadata | Читает метаданные для пути без перехода по символическим ссылкам |
write | Записывает срез байтов как новое содержимое файла |
Модуль future
Источник: Документация futures-lite::future
Комбинаторы для трейта Future.
Примеры
#![allow(unused)] fn main() { use futures_lite::future; for step in 0..3 { println!("step {}", step); // Даем другим задачам шанс выполниться. future::yield_now().await; } }
Структуры
| Структура | Описание |
|---|---|
CatchUnwind | Фьючерс для метода FutureExt::catch_unwind() |
Or | Фьючерс для функции or() и метода FutureExt::or() |
Pending | Создает фьючерс, который никогда не разрешается, представляя вычисление, которое никогда не завершается |
PollFn | Фьючерс для функции poll_fn() |
PollOnce | Фьючерс для функции poll_once() |
Race | Фьючерс для функции race() и метода FutureExt::race() |
Ready | Фьючерс, который сразу готов со значением |
TryZip | Фьючерс для функции try_zip() |
YieldNow | Фьючерс для функции yield_now() |
Zip | Фьючерс для функции zip() |
Трейты
| Трейт | Описание |
|---|---|
Future | Фьючерс представляет асинхронное вычисление, обычно получаемое с помощью async |
FutureExt | Трейт-расширение для Future |
Функции
| Функция | Описание |
|---|---|
block_on | Блокирует текущий поток на фьючерсе |
or | Возвращает результат фьючерса, который завершится первым, с предпочтением future1, если оба готовы |
pending | Создает фьючерс, который никогда не разрешается, представляя вычисление, которое никогда не завершается |
poll_fn | Создает фьючерс из функции, возвращающей Poll |
poll_once | Опрашивает фьючерс всего один раз и возвращает Option с результатом |
race | Возвращает результат фьючерса, который завершится первым, без предпочтений, если оба готовы |
race_with_seed | Соревнует два фьючерса, но с предопределенным случайным сидом |
ready | Создает фьючерс, который сразу готов со значением |
try_zip | Объединяет два фьючерса с возможностью ошибки, ожидая завершения обоих или ошибки одного из них |
yield_now | Будит текущую задачу и возвращает Poll::Pending один раз |
zip | Объединяет два фьючерса, ожидая завершения обоих |
Псевдонимы типов
| Псевдоним | Описание |
|---|---|
Boxed | Псевдоним типа для Pin<Box<dyn Future<Output = T> + Send + 'static>> |
BoxedLocal | Псевдоним типа для Pin<Box<dyn Future<Output = T> + 'static>> |
Модуль io
Источник: Документация futures-lite::io
Инструменты и комбинаторы для ввода-вывода.
Примеры
#![allow(unused)] fn main() { use futures_lite::io::{self, AsyncReadExt}; let input: &[u8] = b"hello"; let mut reader = io::BufReader::new(input); let mut contents = String::new(); reader.read_to_string(&mut contents).await?; }
Структуры
| Структура | Описание |
|---|---|
AssertAsync | Утверждает, что тип, реализующий трейты std::io, может использоваться как асинхронный тип |
AsyncAsSync | Обертка вокруг типа, реализующего AsyncRead или AsyncWrite, которая преобразует опросы Pending в ошибки WouldBlock |
BlockOn | Блокирует все асинхронные I/O операции и реализует трейты std::io |
BufReader | Добавляет буферизацию к читателю |
BufWriter | Добавляет буферизацию к писателю |
Bytes | Читатель для метода AsyncReadExt::bytes() |
Chain | Читатель для метода AsyncReadExt::chain() |
CloseFuture | Фьючерс для метода AsyncWriteExt::close() |
Cursor | Предоставляет буферу в памяти курсор для чтения и записи |
Empty | Читатель для функции empty() |
Error | Тип ошибки для операций I/O трейтов Read, Write, Seek и связанных с ними |
FillBuf | Фьючерс для метода AsyncBufReadExt::fill_buf() |
FlushFuture | Фьючерс для метода AsyncWriteExt::flush() |
Lines | Поток для метода AsyncBufReadExt::lines() |
ReadExactFuture | Фьючерс для метода AsyncReadExt::read_exact() |
ReadFuture | Фьючерс для метода AsyncReadExt::read() |
ReadHalf | Половина для чтения, возвращаемая split() |
ReadLineFuture | Фьючерс для метода AsyncBufReadExt::read_line() |
ReadToEndFuture | Фьючерс для метода AsyncReadExt::read_to_end() |
ReadToStringFuture | Фьючерс для метода AsyncReadExt::read_to_string() |
ReadUntilFuture | Фьючерс для метода AsyncBufReadExt::read_until() |
ReadVectoredFuture | Фьючерс для метода AsyncReadExt::read_vectored() |
Repeat | Читатель для функции repeat() |
SeekFuture | Фьючерс для метода AsyncSeekExt::seek() |
Sink | Писатель для функции sink() |
Split | Поток для метода AsyncBufReadExt::split() |
Take | Читатель для метода AsyncReadExt::take() |
WriteAllFuture | Фьючерс для метода AsyncWriteExt::write_all() |
WriteFuture | Фьючерс для метода AsyncWriteExt::write() |
WriteHalf | Половина для записи, возвращаемая split() |
WriteVectoredFuture | Фьючерс для метода AsyncWriteExt::write_vectored() |
Перечисления
| Перечисление | Описание |
|---|---|
ErrorKind | Список, определяющий общие категории ошибок I/O |
SeekFrom | Перечисление возможных методов поиска в I/O объекте |
Трейты
| Трейт | Описание |
|---|---|
AsyncBufRead | Асинхронное чтение байтов с буферизацией |
AsyncBufReadExt | Трейт-расширение для AsyncBufRead |
AsyncRead | Асинхронное чтение байтов |
AsyncReadExt | Трейт-расширение для AsyncRead |
AsyncSeek | Асинхронный поиск байтов |
AsyncSeekExt | Трейт-расширение для AsyncSeek |
AsyncWrite | Асинхронная запись байтов |
AsyncWriteExt | Трейт-расширение для AsyncWrite |
Функции
| Функция | Описание |
|---|---|
copy | Копирует все содержимое из читателя в писатель |
empty | Создает пустой читатель |
repeat | Создает бесконечного читателя, который повторно читает один и тот же байт |
sink | Создает писатель, который потребляет и отбрасывает все данные |
split | Разделяет поток на половины AsyncRead и AsyncWrite |
Псевдонимы типов
| Псевдоним | Описание |
|---|---|
BoxedReader | Псевдоним типа для Pin<Box<dyn AsyncRead + Send + 'static>> |
BoxedWriter | Псевдоним типа для Pin<Box<dyn AsyncWrite + Send + 'static>> |
Result | Специализированный тип Result для операций I/O |
Крейт lock
Источник: Документация async-lock
Асинхронные примитивы синхронизации.
Этот крейт предоставляет следующие примитивы:
Barrier- позволяет задачам синхронизироваться всем вместе одновременноMutex- мьютекс (взаимное исключение)RwLock- блокировка читатель-писатель, позволяющая любое количество читателей или одного писателяSemaphore- ограничивает количество одновременных операций
Связь с std::sync
В общем случае следует рассмотреть использование типов std::sync вместо типов из этого крейта.
Есть два основных случая использования типов из этого крейта:
- Вам нужно использовать примитив синхронизации в среде
no_std - Вам нужно удерживать блокировку через точку
.await(Удержание блокировкиstd::syncчерез.awaitсделает ваш фьючерс не-Send, а также с высокой вероятностью вызовет взаимные блокировки)
Если вы уже используете libstd и не удерживаете блокировки через точки await (существует Clippy lint под названием await_holding_lock, который выдает предупреждения для этого сценария), вам следует рассмотреть std::sync вместо этого крейта. Эти типы оптимизированы для текущей операционной системы, менее сложны и обычно намного быстрее.
В отличие от этого, система уведомлений async-lock использует std::sync::Mutex внутри, если включена функция std, и переходит к значительно более медленной стратегии, если она не включена. Таким образом, существует мало случаев, когда async-lock выигрывает в производительности по сравнению с std::sync.
Модули
| Модуль | Описание |
|---|---|
futures | Именованные фьючерсы для использования с примитивами async_lock |
Структуры
| Структура | Описание |
|---|---|
Barrier | Счетчик для синхронизации нескольких задач одновременно |
BarrierWaitResult | Возвращается Barrier::wait(), когда все задачи вызвали его |
Mutex | Асинхронный мьютекс |
MutexGuard | Охранник, который освобождает мьютекс при уничтожении |
MutexGuardArc | Владеющий охранник, который освобождает мьютекс при уничтожении |
OnceCell | Область памяти, в которую можно записать не более одного раза |
RwLock | Асинхронная блокировка читатель-писатель |
RwLockReadGuard | Охранник, который освобождает блокировку чтения при уничтожении |
RwLockReadGuardArc | Владеющий, подсчитывающий ссылки охранник, который освобождает блокировку чтения при уничтожении |
RwLockUpgradableReadGuard | Охранник, который освобождает улучшаемую блокировку чтения при уничтожении |
RwLockUpgradableReadGuardArc | Владеющий, подсчитывающий ссылки охранник, который освобождает улучшаемую блокировку чтения при уничтожении |
RwLockWriteGuard | Охранник, который освобождает блокировку записи при уничтожении |
RwLockWriteGuardArc | Владеющий, подсчитывающий ссылки охранник, который освобождает блокировку записи при уничтожении |
Semaphore | Счетчик для ограничения количества одновременных операций |
SemaphoreGuard | Охранник, который освобождает полученное разрешение |
SemaphoreGuardArc | Владеющий охранник, который освобождает полученное разрешение |
Крейт net
Источник: Документация async-net
Асинхронные примитивы для сетевого взаимодействия через TCP/UDP/Unix.
Этот крейт представляет собой асинхронную версию std::net и std::os::unix::net.
Реализация
Этот крейт использует async-io для асинхронного I/O и blocking для DNS-запросов.
Примеры
Простой UDP-сервер, который возвращает сообщения обратно отправителю:
#![allow(unused)] fn main() { use async_net::UdpSocket; let socket = UdpSocket::bind("127.0.0.1:8080").await?; let mut buf = vec![0u8; 1024]; loop { let (n, addr) = socket.recv_from(&mut buf).await?; socket.send_to(&buf[..n], &addr).await?; } }
Модули
| Модуль | Описание |
|---|---|
unix | Unix-доменные сокеты |
Структуры
| Структура | Описание |
|---|---|
AddrParseError | Ошибка, которая может быть возвращена при разборе IP-адреса или адреса сокета |
Incoming | Поток входящих TCP-подключений |
Ipv4Addr | IPv4-адрес |
Ipv6Addr | IPv6-адрес |
SocketAddrV4 | Адрес IPv4-сокета |
SocketAddrV6 | Адрес IPv6-сокета |
TcpListener | TCP-сервер, прослушивающий подключения |
TcpStream | TCP-подключение |
UdpSocket | UDP-сокет |
Перечисления
| Перечисление | Описание |
|---|---|
IpAddr | IP-адрес, либо IPv4, либо IPv6 |
Shutdown | Возможные значения, которые могут быть переданы в метод TcpStream::shutdown |
SocketAddr | Интернет-адрес сокета, либо IPv4, либо IPv6 |
Трейты
| Трейт | Описание |
|---|---|
AsyncToSocketAddrs | Преобразует или разрешает адреса в значения SocketAddr |
Функции
| Функция | Описание |
|---|---|
resolve | Преобразует или разрешает адреса в значения SocketAddr |
Модуль prelude
Источник: Документация futures-lite::prelude
Трейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения.
Примеры
#![allow(unused)] fn main() { use futures_lite::prelude::*; }
Трейты
| Трейт | Описание |
|---|---|
AsyncBufRead | Асинхронное чтение байтов с буферизацией |
AsyncRead | Асинхронное чтение байтов |
AsyncSeek | Асинхронный поиск байтов |
AsyncWrite | Асинхронная запись байтов |
Future | Фьючерс представляет асинхронное вычисление, обычно получаемое с помощью async |
Stream | Поток значений, производимых асинхронно |
FutureExt | Трейт-расширение для Future |
StreamExt | Трейт-расширение для Stream |
AsyncBufReadExt | Трейт-расширение для AsyncBufRead |
AsyncReadExt | Трейт-расширение для AsyncRead |
AsyncSeekExt | Трейт-расширение для AsyncSeek |
AsyncWriteExt | Трейт-расширение для AsyncWrite |
Крейт process
Источник: Документация async-process
Асинхронный интерфейс для работы с процессами.
Этот крейт представляет собой асинхронную версию std::process.
Реализация
Фоновый поток с именем "async-process" лениво создается при первом использовании, который ожидает завершения порожденных дочерних процессов и затем вызывает системный вызов wait() для очистки процессов-«зомби». Это отличается от API процессов в стандартной библиотеке, где удаление работающего Child приводит к утечке его ресурсов.
Этот крейт использует async-io для асинхронного I/O в Unix-подобных системах и blocking для асинхронного I/O в Windows.
Примеры
Запуск процесса и сбор его вывода:
#![allow(unused)] fn main() { use async_process::Command; let out = Command::new("echo").arg("hello").arg("world").output().await?; assert_eq!(out.stdout, b"hello world\n"); }
Чтение вывода построчно по мере его создания:
#![allow(unused)] fn main() { use async_process::{Command, Stdio}; use futures_lite::{io::BufReader, prelude::*}; let mut child = Command::new("find") .arg(".") .stdout(Stdio::piped()) .spawn()?; let mut lines = BufReader::new(child.stdout.take().unwrap()).lines(); while let Some(line) = lines.next().await { println!("{}", line?); } }
Модули
| Модуль | Описание |
|---|---|
unix | Unix-специфичные расширения |
Структуры
| Структура | Описание |
|---|---|
Child | Порождённый дочерний процесс |
ChildStderr | Дескриптор стандартного ошибочного вывода (stderr) дочернего процесса |
ChildStdin | Дескриптор стандартного ввода (stdin) дочернего процесса |
ChildStdout | Дескриптор стандартного вывода (stdout) дочернего процесса |
Command | Построитель для порождения процессов |
ExitStatus | Описывает результат процесса после его завершения |
Output | Вывод завершенного процесса |
Stdio | Описывает, что делать со стандартным I/O потоком для дочернего процесса при передаче в методы stdin, stdout и stderr команды Command |
Функции
| Функция | Описание |
|---|---|
driver | Запускает драйвер для асинхронных процессов |
Модуль stream
Источник: Документация futures-lite::stream
Комбинаторы для трейта Stream.
Примеры
#![allow(unused)] fn main() { use futures_lite::stream::{self, StreamExt}; let mut s = stream::iter(vec![1, 2, 3]); assert_eq!(s.next().await, Some(1)); assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, Some(3)); assert_eq!(s.next().await, None); }
Структуры
| Структура | Описание |
|---|---|
AllFuture | Фьючерс для метода StreamExt::all() |
AnyFuture | Фьючерс для метода StreamExt::any() |
BlockOn | Итератор для функции block_on() |
Chain | Поток для метода StreamExt::chain() |
Cloned | Поток для метода StreamExt::cloned() |
CollectFuture | Фьючерс для метода StreamExt::collect() |
Copied | Поток для метода StreamExt::copied() |
CountFuture | Фьючерс для метода StreamExt::count() |
Cycle | Поток для метода StreamExt::cycle() |
Drain | Поток для метода StreamExt::drain() |
Empty | Поток для функции empty() |
Enumerate | Поток для метода StreamExt::enumerate() |
Filter | Поток для метода StreamExt::filter() |
FilterMap | Поток для метода StreamExt::filter_map() |
FindFuture | Фьючерс для метода StreamExt::find() |
FindMapFuture | Фьючерс для метода StreamExt::find_map() |
FlatMap | Поток для метода StreamExt::flat_map() |
Flatten | Поток для метода StreamExt::flatten() |
FoldFuture | Фьючерс для метода StreamExt::fold() |
ForEachFuture | Фьючерс для метода StreamExt::for_each() |
Fuse | Поток для метода StreamExt::fuse() |
Inspect | Поток для метода StreamExt::inspect() |
Iter | Поток для функции iter() |
LastFuture | Фьючерс для метода StreamExt::last() |
Map | Поток для метода StreamExt::map() |
NextFuture | Фьючерс для метода StreamExt::next() |
NthFuture | Фьючерс для метода StreamExt::nth() |
Once | Поток для функции once() |
OnceFuture | Поток для функции once_future() |
Or | Поток для функции or() и метода StreamExt::or() |
PartitionFuture | Фьючерс для метода StreamExt::partition() |
Pending | Поток для функции pending() |
PollFn | Поток для функции poll_fn() |
PositionFuture | Фьючерс для метода StreamExt::position() |
Race | Поток для функции race() и метода StreamExt::race() |
Repeat | Поток для функции repeat() |
RepeatWith | Поток для функции repeat_with() |
Scan | Поток для метода StreamExt::scan() |
Skip | Поток для метода StreamExt::skip() |
SkipWhile | Поток для метода StreamExt::skip_while() |
StepBy | Поток для метода StreamExt::step_by() |
Take | Поток для метода StreamExt::take() |
TakeWhile | Поток для метода StreamExt::take_while() |
Then | Поток для метода StreamExt::then() |
TryCollectFuture | Фьючерс для метода StreamExt::try_collect() |
TryFoldFuture | Фьючерс для метода StreamExt::try_fold() |
TryForEachFuture | Фьючерс для метода StreamExt::try_for_each() |
TryNextFuture | Фьючерс для метода StreamExt::try_next() |
TryUnfold | Поток для функции try_unfold() |
Unfold | Поток для функции unfold() |
UnzipFuture | Фьючерс для метода StreamExt::unzip() |
Zip | Поток для метода StreamExt::zip() |
Трейты
Функции
| Функция | Описание |
|---|---|
block_on | Преобразует поток в блокирующий итератор |
empty | Создает пустой поток |
iter | Создает поток из итератора |
once | Создает поток, который выдает один элемент |
once_future | Создает поток, который вызывает заданный фьючерс как свой первый элемент, а затем больше не производит элементов |
or | Объединяет два потока, предпочитая элементы из stream1, когда оба потока готовы |
pending | Создает поток, который всегда находится в состоянии ожидания |
poll_fn | Создает поток из функции, возвращающей Poll |
race | Объединяет два потока без предпочтений, когда оба готовы |
race_with_seed | Соревнует два потока, но с предоставленным пользователем сидом для случайности |
repeat | Создает бесконечный поток, который повторно выдает один и тот же элемент |
repeat_with | Создает бесконечный поток из замыкания, генерирующего элементы |
try_unfold | Создает поток из начального значения и замыкания с возможностью ошибки, работающего с ним |
unfold | Создает поток из начального значения и асинхронного замыкания, работающего с ним |
Псевдонимы типов
| Псевдоним | Описание |
|---|---|
Boxed | Псевдоним типа для Pin<Box<dyn Stream<Item = T> + Send + 'static>> |
BoxedLocal | Псевдоним типа для Pin<Box<dyn Stream<Item = T> + 'static>> |
Макросы
Макрос pin
Источник: Документация futures-lite::pin
#![allow(unused)] fn main() { macro_rules! pin { ($($x:ident),* $(,)?) => { ... }; } }
Фиксирует переменную типа T в стеке и перепривязывает её как Pin<&mut T>.
Примеры
#![allow(unused)] fn main() { use futures_lite::{future, pin}; use std::fmt::Debug; use std::future::Future; use std::pin::Pin; use std::time::Instant; // Инспектирует каждый вызов `Future::poll()`. async fn inspect<T: Debug>(f: impl Future<Output = T>) -> T { pin!(f); future::poll_fn(|cx| dbg!(f.as_mut().poll(cx))).await } let f = async { 1 + 2 }; inspect(f).await; }
Описание
Макрос pin! используется для безопасного закрепления (pinning) переменных в стеке. Это особенно полезно при работе с асинхронным кодом, где необходимо гарантировать, что данные не будут перемещены в памяти после того, как на них созданы ссылки в асинхронных фьючерсах.
Как работает
- Принимает один или несколько идентификаторов переменных
- Для каждой переменной создает закрепленную ссылку
Pin<&mut T> - Перепривязывает исходную переменную к закрепленной версии
Использование
#![allow(unused)] fn main() { use futures_lite::pin; let mut value = SomeFuture::new(); pin!(value); // Теперь `value` имеет тип `Pin<&mut SomeFuture>` // Можно использовать закрепленную переменную let pinned = value.as_mut(); }
Макрос обеспечивает безопасное закрепление, которое необходимо для вызова методов poll на фьючерсах и других типах, требующих гарантий неперемещаемости.
Макрос ready
Источник: Документация futures-lite::ready
#![allow(unused)] fn main() { macro_rules! ready { ($e:expr $(,)?) => { ... }; } }
Разворачивает Poll<T> или возвращает Pending.
Примеры
#![allow(unused)] fn main() { use futures_lite::{future, prelude::*, ready}; use std::pin::Pin; use std::task::{Context, Poll}; fn do_poll(cx: &mut Context<'_>) -> Poll<()> { let mut fut = future::ready(42); let fut = Pin::new(&mut fut); let num = ready!(fut.poll(cx)); // ... используем num Poll::Ready(()) } }
Как работает
Вызов макроса ready! раскрывается в:
#![allow(unused)] fn main() { let num = match fut.poll(cx) { Poll::Ready(t) => t, Poll::Pending => return Poll::Pending, }; }
Описание
Макрос ready! используется для упрощения работы с типами Poll<T> в асинхронном коде. Он позволяет:
- Извлечь значение из
Poll::Ready(T) - Немедленно вернуть
Poll::Pending, если результат еще не готов
Это особенно полезно при реализации методов poll вручную, где необходимо последовательно опрашивать несколько фьючерсов или асинхронных операций.
Типичное использование
#![allow(unused)] fn main() { use futures_lite::ready; use std::task::{Context, Poll}; fn poll_multiple_operations( cx: &mut Context<'_>, ) -> Poll<Result<(), MyError>> { // Опрашиваем первую операцию let result1 = ready!(self.op1.poll(cx))?; // Опрашиваем вторую операцию let result2 = ready!(self.op2.poll(cx))?; Poll::Ready(Ok(())) } }
Макрос автоматически обрабатывает распространение ошибок, если тип Poll содержит Result, и обеспечивает корректное возвращение управления при Poll::Pending.
Структуры
Структура Async
Источник: Документация smol::Async
#![allow(unused)] fn main() { pub struct Async<T> { /* private fields */ } }
Асинхронный адаптер для I/O типов.
Этот тип переводит I/O дескриптор в неблокирующий режим, регистрирует его в epoll/kqueue/event ports/IOCP и предоставляет асинхронный интерфейс для работы с ним.
Особенности
Async является низкоуровневым примитивом и имеет некоторые ограничения:
- Для более высокоуровневых примитивов, построенных на основе
Async, используйтеasync-netилиasync-process(на Unix) - Небезопасно обращаться к внутреннему I/O источнику изменяемо с помощью этого примитива
- Трейты
AsyncReadиAsyncWriteне реализованы по умолчанию, если не гарантировано, что ресурс не будет инвалидирован при чтении или записи
Поддерживаемые типы
Async поддерживает все сетевые типы, а также некоторые OS-специфичные файловые дескрипторы, такие как timerfd и inotify.
Не используйте Async с типами типа File, Stdin, Stdout или Stderr, поскольку все операционные системы имеют проблемы с ними в неблокирующем режиме.
Конкурентный I/O
&Async<T> реализует AsyncRead и AsyncWrite, если &T реализует эти трейты, что означает, что задачи могут конкурентно читать и писать с использованием общих ссылок.
Важно: только одна задача может читать за раз и только одна задача может писать за раз. Допустимо иметь две задачи, где одна читает, а другая пишет одновременно, но недопустимо иметь две задачи, читающие одновременно или пишущие одновременно.
Закрытие
Закрытие стороны записи Async с помощью close() просто выполняет сброс. Если вы хотите завершить работу TCP или Unix сокета, используйте Shutdown.
Примеры
Подключение к серверу и возврат входящих сообщений обратно на сервер:
#![allow(unused)] fn main() { use async_io::Async; use futures_lite::io; use std::net::TcpStream; // Подключаемся к локальному серверу. let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?; // Копируем все сообщения из стороны чтения потока в сторону записи. io::copy(&stream, &stream).await?; }
Реализации
Источник: Документация smol::Async
Конструкторы
| Метод | Сигнатура | Описание |
|---|---|---|
new | pub fn new(io: T) -> Result<Async<T>> | Создает новый асинхронный I/O дескриптор |
Методы доступа
| Метод | Сигнатура | Описание |
|---|---|---|
get_ref | pub fn get_ref(&self) -> &T | Получает ссылку на внутренний дескриптор |
get_mut | pub fn get_mut(&mut self) -> &mut T | Получает изменяемую ссылку на внутренний дескриптор |
into_inner | pub fn into_inner(self) -> Result<T> | Извлекает внутренний дескриптор |
Методы для чтения/записи
| Метод | Сигнатура | Описание |
|---|---|---|
read_with | pub fn read_with<R>(&self, op: impl FnOnce(&T) -> Result<R>) -> impl Future<Output = Result<R>> | Выполняет блокирующую операцию чтения асинхронно |
read_with_mut | pub fn read_with_mut<R>(&self, op: impl FnOnce(&mut T) -> Result<R>) -> impl Future<Output = Result<R>> | Выполняет блокирующую операцию чтения с изменяемым доступом |
write_with | pub fn write_with<R>(&self, op: impl FnOnce(&T) -> Result<R>) -> impl Future<Output = Result<R>> | Выполняет блокирующую операцию записи асинхронно |
write_with_mut | pub fn write_with_mut<R>(&self, op: impl FnOnce(&mut T) -> Result<R>) -> impl Future<Output = Result<R>> | Выполняет блокирующую операцию записи с изменяемым доступом |
Методы доступности
| Метод | Сигнатура | Описание |
|---|---|---|
readable | pub fn readable(&self) -> Readable<'_, T> | Создает фьючерс для ожидания доступности для чтения |
writable | pub fn writable(&self) -> Writable<'_, T> | Создает фьючерс для ожидания доступности для записи |
poll_readable | pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<()>> | Опрашивает доступность для чтения |
poll_writable | pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<()>> | Опрашивает доступность для записи |
Сетевые методы (для TcpStream)
| Метод | Сигнатура | Описание |
|---|---|---|
connect | pub async fn connect(addr: impl ToSocketAddrs) -> Result<Async<TcpStream>> | Асинхронно подключается к адресу |
peer_addr | pub fn peer_addr(&self) -> Result<SocketAddr> | Возвращает адрес удаленного peer |
local_addr | pub fn local_addr(&self) -> Result<SocketAddr> | Возвращает локальный адрес |
shutdown | pub fn shutdown(&self, how: Shutdown) -> Result<()> | Завершает часть соединения |
Сетевые методы (для TcpListener)
| Метод | Сигнатура | Описание |
|---|---|---|
bind | pub fn bind(addr: impl ToSocketAddrs) -> Result<Async<TcpListener>> | Привязывает сокет к адресу |
accept | pub async fn accept(&self) -> Result<(Async<TcpStream>, SocketAddr)> | Асинхронно принимает входящее подключение |
local_addr | pub fn local_addr(&self) -> Result<SocketAddr> | Возвращает локальный адрес |
Сетевые методы (для UdpSocket)
| Метод | Сигнатура | Описание |
|---|---|---|
bind | pub fn bind(addr: impl ToSocketAddrs) -> Result<Async<UdpSocket>> | Привязывает UDP сокет к адресу |
connect | pub async fn connect(&self, addr: impl ToSocketAddrs) -> Result<()> | Подключает UDP сокет к адресу |
send | pub async fn send(&self, buf: &[u8]) -> Result<usize> | Отправляет данные в подключенный адрес |
recv | pub async fn recv(&self, buf: &mut [u8]) -> Result<usize> | Получает данные из подключенного адреса |
send_to | pub async fn send_to(&self, buf: &[u8], addr: impl ToSocketAddrs) -> Result<usize> | Отправляет данные на указанный адрес |
recv_from | pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> | Получает данные и адрес отправителя |
local_addr | pub fn local_addr(&self) -> Result<SocketAddr> | Возвращает локальный адрес |
peer_addr | pub fn peer_addr(&self) -> Result<SocketAddr> | Возвращает адрес удаленного peer |
Трейты
Реализации From
| Трейт | Описание |
|---|---|
From<Async<T>> | Преобразование в внутренний тип |
Реализации для трейтов асинхронного I/O
| Трейт | Методы | Описание |
|---|---|---|
AsyncRead | poll_read | Асинхронное чтение данных |
AsyncWrite | poll_write, poll_flush, poll_close | Асинхронная запись данных |
AsyncBufRead | poll_fill_buf, consume | Асинхронное буферизованное чтение |
AsyncSeek | poll_seek | Асинхронное перемещение по потоку |
Другие трейты
| Трейт | Описание |
|---|---|
Debug | Отладочное представление |
Clone | Клонирование (если T: Clone) |
AsRef<T> | Получение ссылки на внутренний тип |
AsMut<T> | Получение изменяемой ссылки на внутренний тип |
Зависимости
Структура Async зависит от следующих трейтов и типов:
std::io::Result,std::io::Errorstd::net::ToSocketAddrs,SocketAddrstd::net::{TcpStream, TcpListener, UdpSocket}std::future::Futurestd::task::{Context, Poll}futures_io::{AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek}std::fmt::Debugstd::clone::Clone
Вспомогательные типы
| Тип | Описание |
|---|---|
Readable<'a, T> | Фьючерс для ожидания доступности чтения |
Writable<'a, T> | Фьючерс для ожидания доступности записи |
Структура Executor
Источник: Документация async-executor::Executor
#![allow(unused)] fn main() { pub struct Executor<'a> { /* private fields */ } }
Асинхронный исполнитель (executor).
Примеры
Многопоточный исполнитель:
#![allow(unused)] fn main() { use async_channel::unbounded; use async_executor::Executor; use easy_parallel::Parallel; use futures_lite::future; let ex = Executor::new(); let (signal, shutdown) = unbounded::<()>(); Parallel::new() // Запускаем четыре потока исполнителя. .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) // Запускаем основной фьючерс в текущем потоке. .finish(|| future::block_on(async { println!("Hello world!"); drop(signal); })); }
Методы
new
#![allow(unused)] fn main() { pub const fn new() -> Executor<'a> }
Создает новый исполнитель.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; let ex = Executor::new(); }
is_empty
#![allow(unused)] fn main() { pub fn is_empty(&self) -> bool }
Возвращает true, если нет незавершенных задач.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; let ex = Executor::new(); assert!(ex.is_empty()); let task = ex.spawn(async { println!("Hello world"); }); assert!(!ex.is_empty()); assert!(ex.try_tick()); assert!(ex.is_empty()); }
spawn
#![allow(unused)] fn main() { pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> where T: Send + 'a, }
Запускает задачу в исполнителе.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; let ex = Executor::new(); let task = ex.spawn(async { println!("Hello world"); }); }
spawn_many
#![allow(unused)] fn main() { pub fn spawn_many<T, F>( &self, futures: impl IntoIterator<Item = F>, handles: &mut impl Extend<Task<<F as Future>::Output>>, ) where T: Send + 'a, F: Future<Output = T> + Send + 'a, }
Запускает множество задач в исполнителе.
В отличие от метода spawn, этот метод блокирует внутреннюю блокировку задач исполнителя один раз и запускает все задачи за одну операцию. При большом количестве задач это может улучшить конкуренцию.
Пример:
#![allow(unused)] fn main() { use async_executor::Executor; use futures_lite::{stream, prelude::*}; use std::future::ready; let mut ex = Executor::new(); let futures = [ ready(1), ready(2), ready(3) ]; // Запускаем все фьючерсы в исполнителе одновременно. let mut tasks = vec![]; ex.spawn_many(futures, &mut tasks); // Ожидаем завершения всех. let results = ex.run(async move { stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await }).await; assert_eq!(results, [1, 2, 3]); }
try_tick
#![allow(unused)] fn main() { pub fn try_tick(&self) -> bool }
Пытается выполнить задачу, если хотя бы одна запланирована.
Выполнение запланированной задачи означает просто однократное опрашивание ее фьючерса.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; let ex = Executor::new(); assert!(!ex.try_tick()); // нет задач для выполнения let task = ex.spawn(async { println!("Hello world"); }); assert!(ex.try_tick()); // задача найдена и выполнена }
tick
#![allow(unused)] fn main() { pub async fn tick(&self) }
Выполняет одну задачу.
Выполнение задачи означает просто однократное опрашивание ее фьючерса.
Если при вызове этого метода нет запланированных задач, он будет ждать, пока одна не появится.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; use futures_lite::future; let ex = Executor::new(); let task = ex.spawn(async { println!("Hello world"); }); future::block_on(ex.tick()); // выполняет задачу }
run
#![allow(unused)] fn main() { pub async fn run<T>(&self, future: impl Future<Output = T>) -> T }
Запускает исполнитель до завершения указанного фьючерса.
Примеры:
#![allow(unused)] fn main() { use async_executor::Executor; use futures_lite::future; let ex = Executor::new(); let task = ex.spawn(async { 1 + 2 }); let res = future::block_on(ex.run(async { task.await * 2 })); assert_eq!(res, 6); }
Реализации трейтов
Debug
#![allow(unused)] fn main() { impl Debug for Executor<'_> }
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>- Форматирует значение с помощью заданного форматировщика.
Default
#![allow(unused)] fn main() { impl<'a> Default for Executor<'a> }
fn default() -> Executor<'a>- Возвращает "значение по умолчанию" для типа.
Drop
#![allow(unused)] fn main() { impl Drop for Executor<'_> }
fn drop(&mut self)- Выполняет деструктор для этого типа.
Маркерные трейты
RefUnwindSafe- Безопасен для размотки через ссылкуSend- Может быть передан между потокамиSync- Может быть разделен между потокамиUnwindSafe- Безопасен для размотки
Автоматические реализации трейтов
!Freeze- Не замораживаетсяUnpin- Может быть перемещено после закрепления
Стандартные реализации
| Трейт | Описание |
|---|---|
Any | Проверка типов во время выполнения |
Borrow<T> | Заимствование как &T |
BorrowMut<T> | Заимствование как &mut T |
From<T> | Преобразование из типа T |
Instrument | Инструментирование для трассировки |
Into<U> | Преобразование в тип U |
TryFrom<U> | Попытка преобразования из типа U |
TryInto<U> | Попытка преобразования в тип U |
WithSubscriber | Работа с подписчиками |
Основные методы
| Метод | Сигнатура | Возвращаемый тип | Описание |
|---|---|---|---|
new | pub fn new() -> Executor<'a> | Executor<'a> | Создает новый глобальный исполнитель |
spawn | pub fn spawn<T>(future: impl Future<Output = T> + Send + 'static) -> Task<T> | Task<T> | Запускает задачу в глобальном исполнителе |
block_on | pub fn block_on<T>(future: impl Future<Output = T>) -> T | T | Блокирует текущий поток до завершения фьючерса |
Статические методы
| Метод | Сигнатура | Возвращаемый тип | Описание |
|---|---|---|---|
global | pub fn global() -> &'static Executor<'static> | &'static Executor | Возвращает ссылку на глобальный исполнитель |
Методы экземпляра
| Метод | Сигнатура | Возвращаемый тип | Описание |
|---|---|---|---|
run | pub fn run<T>(&self, future: impl Future<Output = T>) -> T | T | Запускает исполнитель до завершения фьючерса |
try_tick | pub fn try_tick(&self) -> bool | bool | Пытается выполнить одну задачу, если есть готовые |
tick | pub async fn tick(&self) | () | Выполняет одну задачу, ожидая если нет готовых |
Зависимости типов
Основные типы
| Тип | Источник | Назначение |
|---|---|---|
Task<T> | smol::Task | Представляет запущенную асинхронную задачу |
Future | std::future::Future | Базовый трейт для асинхронных вычислений |
Executor<'a> | smol::Executor | Основной тип исполнителя |
Трейты времени выполнения
| Трейт | Назначение |
|---|---|
Send | Гарантирует безопасную передачу между потоками |
'static | Гарантирует время жизни на всю программу |
Реализации трейтов
Стандартные трейты
| Трейт | Реализация | Назначение |
|---|---|---|
Debug | impl Debug for Executor<'_> | Отладочное представление |
Default | impl Default for Executor<'_> | Создание значения по умолчанию |
Drop | impl Drop for Executor<'_> | Управление временем жизни |
Маркерные трейты
| Трейт | Назначение |
|---|---|
Send | Безопасная передача между потоками |
Sync | Безопасное разделение между потоками |
UnwindSafe | Безопасность при размотке стека |
RefUnwindSafe | Безопасность ссылок при размотке |
Автоматические реализации
| Трейт | Особенности |
|---|---|
Unpin | Может быть перемещено после закрепления |
!Freeze | Не может быть заморожено |
Особенности использования
Глобальный исполнитель
#![allow(unused)] fn main() { // Получение глобального исполнителя let executor = Executor::global(); // Запуск задачи в глобальном исполнителе let task = executor.spawn(async { // асинхронный код }); }
Локальное использование
#![allow(unused)] fn main() { // Создание локального исполнителя let executor = Executor::new(); // Блокировка до завершения executor.block_on(async { // асинхронный код }); }
Ограничения и требования
- Задачи должны реализовывать
Send + 'static - Исполнитель предназначен для многопоточного использования
block_onблокирует текущий потокspawnне блокирует и возвращаетTaskсразу
Структура LocalExecutor - Сводная таблица методов
Источник: Документация async-executor::LocalExecutor
- Только один поток - нельзя передавать между потоками
- Нет Send - задачи не должны быть Send
- Локальное время жизни - привязан к области видимости
- Блокирующие итераторы - могут заблокировать исполнитель
Сравнение с Executor
| Характеристика | LocalExecutor | Executor |
|---|---|---|
| Многопоточность | ❌ Только один поток | ✅ Многопоточный |
| Требование Send | ❌ Не требуется | ✅ Обязательно |
| Производительность | ✅ Выше для локальных задач | ✅ Лучше для CPU-bound |
| Безопасность | ✅ Нет гонок данных | ✅ Синхронизированный |
Основные методы
| Метод | Сигнатура | Возвращаемый тип | Описание | Особенности |
|---|---|---|---|---|
new | pub const fn new() -> LocalExecutor<'a> | LocalExecutor<'a> | Создает однопоточный исполнитель | Только для текущего потока |
is_empty | pub fn is_empty(&self) -> bool | bool | Проверяет отсутствие незавершенных задач | true если задач нет |
spawn | pub fn spawn<T>(future: impl Future<Output = T> + 'a) -> Task<T> | Task<T> | Запускает задачу в исполнителе | Не требует Send |
spawn_many | pub fn spawn_many<T, F>(futures: impl IntoIterator<Item = F>, handles: &mut impl Extend<Task<F::Output>>) | () | Массовый запуск задач | Улучшает производительность |
try_tick | pub fn try_tick(&self) -> bool | bool | Пытается выполнить одну задачу | Неблокирующий |
tick | pub async fn tick(&self) | () | Выполняет одну задачу | Блокирует если задач нет |
run | pub async fn run<T>(future: impl Future<Output = T>) -> T | T | Запускает исполнитель до завершения фьючерса | Основной метод выполнения |
Подробное описание методов
Конструкторы
| Метод | Особенности | Пример использования |
|---|---|---|
new | Создает локальный исполнитель для текущего потока | let ex = LocalExecutor::new(); |
default | Через трейт Default (аналогично new) | let ex = LocalExecutor::default(); |
Управление задачами
| Метод | Назначение | Отличие от Executor |
|---|---|---|
spawn | Запуск одной задачи | Не требует Send |
spawn_many | Пакетный запуск задач | Блокировка не снимается (нет других потоков) |
is_empty | Проверка состояния | Аналогично Executor |
Выполнение
| Метод | Тип выполнения | Применение |
|---|---|---|
try_tick | Неблокирующий | Для фонового выполнения |
tick | Асинхронный | Для кооперативной многозадачности |
run | Блокирующий | Основной цикл выполнения |
Требования к типам
Для задач
| Параметр | Требование | Обоснование |
|---|---|---|
T | 'a | Время жизни задачи |
F | Future<Output = T> + 'a | Локальный фьючерс |
| Отсутствует | Send | Только один поток |
Реализации трейтов
Стандартные трейты
| Трейт | Назначение | Особенности |
|---|---|---|
Debug | Отладочный вывод | Форматирование |
Default | Значение по умолчанию | Аналогично new() |
Маркерные трейты (отличия от Executor)
| Трейт | Реализация | Обоснование |
|---|---|---|
!Send | Не реализован | Только для текущего потока |
!Sync | Не реализован | Нельзя разделять между потоками |
UnwindSafe | Реализован | Безопасность при панике |
RefUnwindSafe | Реализован | Безопасность ссылок при панике |
Автоматические реализации
| Трейт | Статус | Примечание |
|---|---|---|
Unpin | Реализован | Можно перемещать |
!Freeze | Не замораживается | - |
Примеры использования
Базовое использование
#![allow(unused)] fn main() { use async_executor::LocalExecutor; use futures_lite::future; let local_ex = LocalExecutor::new(); future::block_on(local_ex.run(async { println!("Hello world!"); })); }
Запуск задач
#![allow(unused)] fn main() { let ex = LocalExecutor::new(); // Одиночная задача let task = ex.spawn(async { 1 + 2 }); // Массовый запуск let mut tasks = vec![]; ex.spawn_many([async { 1 }, async { 2 }], &mut tasks); }
Структура Task
Источник: Документация smol::Task
#![allow(unused)] fn main() { pub struct Task<T, M = ()> { /* private fields */ } }
Запущенная задача.
Описание
Задачу Task можно ожидать (await) для получения результата ее фьючерса.
Удаление (dropping) задачи Task отменяет ее, что означает, что ее фьючерс больше не будет опрашиваться. Чтобы удалить дескриптор задачи без ее отмены, используйте вместо этого метод detach(). Для грациозной отмены задачи и ожидания ее полного уничтожения используйте метод cancel().
Важно: отмена задачи фактически пробуждает ее и перепланирует один последний раз. Затем исполнитель может уничтожить задачу, просто удалив ее Runnable или вызвав run().
Параметры типа
T- тип результата, возвращаемого задачейM- тип метаданных (по умолчанию())
Примеры
#![allow(unused)] fn main() { use smol::{future, Executor}; use std::thread; let ex = Executor::new(); // Запускаем фьючерс в исполнителе. let task = ex.spawn(async { println!("Hello from a task!"); 1 + 2 }); // Запускаем поток исполнителя. thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); // Ожидаем результат задачи. assert_eq!(future::block_on(task), 3); }
- Ожидание результата: Задачи можно ожидать через
awaitдля получения результата- Автоматическая отмена: Удаление задачи приводит к ее отмене
- Грациозная отмена: Метод
cancel()обеспечивает корректное завершение- Фоновое выполнение:
detach()позволяет задаче работать в фоне
Основные методы
| Метод | Сигнатура | Возвращаемый тип | Описание |
|---|---|---|---|
detach | pub fn detach(self) | () | Отсоединяет задачу для фонового выполнения |
cancel | pub async fn cancel(self) -> Option<T> | Option<T> | Отменяет задачу и ждет завершения |
fallible | pub fn fallible(self) -> FallibleTask<T, M> | FallibleTask<T, M> | Преобразует в FallibleTask |
is_finished | pub fn is_finished(&self) -> bool | bool | Проверяет завершена ли задача |
metadata | pub fn metadata(&self) -> &M | &M | Возвращает метаданные задачи |
Подробное описание методов
Управление выполнением
| Метод | Назначение | Особенности | Пример использования |
|---|---|---|---|
detach | Отсоединяет задачу | Задача продолжает работать в фоне | task.detach() |
cancel | Грациозная отмена | Ждет завершения, возвращает результат | task.cancel().await |
fallible | Создает fallible версию | Обрабатывает отмену через None | task.fallible() |
Состояние и метаданные
| Метод | Назначение | Примечания |
|---|---|---|
is_finished | Проверка состояния | Может измениться сразу после вызова |
metadata | Доступ к метаданным | По умолчанию () |
Примеры использования
Базовое использование
#![allow(unused)] fn main() { use smol::{future, Executor}; use std::thread; let ex = Executor::new(); // Запуск задачи в исполнителе let task = ex.spawn(async { println!("Hello from a task!"); 1 + 2 }); // Ожидание результата assert_eq!(future::block_on(task), 3); }
Отсоединение задачи
#![allow(unused)] fn main() { use smol::{Executor, Timer}; use std::time::Duration; let ex = Executor::new(); // Демон-задача ex.spawn(async { loop { println!("I'm a daemon task looping forever."); Timer::after(Duration::from_secs(1)).await; } }) .detach(); }
Грациозная отмена
#![allow(unused)] fn main() { use smol::{future, Executor, Timer}; use std::thread; use std::time::Duration; let ex = Executor::new(); let task = ex.spawn(async { loop { println!("You can still cancel me!"); Timer::after(Duration::from_secs(1)).await; } }); future::block_on(async { Timer::after(Duration::from_secs(3)).await; task.cancel().await; // Грациозная отмена }); }
Реализации трейтов
Основные трейты
| Трейт | Реализация | Назначение |
|---|---|---|
Future | impl<T, M> Future for Task<T, M> | Ожидание результата задачи |
Debug | impl<T, M> Debug for Task<T, M> | Отладочное представление |
Drop | impl<T, M> Drop for Task<T, M> | Автоматическая отмена при удалении |
Маркерные трейты
| Трейт | Условия | Назначение |
|---|---|---|
Send | T: Send, M: Send + Sync | Передача между потоками |
Sync | M: Send + Sync | Разделение между потоками |
Unpin | Всегда | Можно перемещать после закрепления |
UnwindSafe | Всегда | Безопасность при панике |
RefUnwindSafe | Всегда | Безопасность ссылок при панике |
Freeze | Всегда | Может быть заморожено |
Типы и зависимости
Параметры типа
| Параметр | Ограничения | Назначение |
|---|---|---|
T | Любой тип | Результат выполнения задачи |
M | По умолчанию () | Метаданные задачи |
Связанные типы
| Тип | Назначение |
|---|---|
FallibleTask<T, M> | Задача с обработкой отмены |
Output = T | Тип результата для Future |
Поведение при удалении
| Действие | Результат | Альтернатива |
|---|---|---|
drop(task) | Немедленная отмена | task.detach() |
task.cancel().await | Грациозная отмена | - |
task.detach() | Продолжение выполнения | - |
Особенности многопоточности
is_finished()может изменить состояние сразу после вызова- Отмена задачи пробуждает ее для последнего выполнения
- Исполнитель может уничтожить задачу через
Runnable
Таблица сравнения методов отмены
| Метод | Тип | Возвращает | Поведение |
|---|---|---|---|
drop() | Синхронный | - | Немедленная отмена |
cancel().await | Асинхронный | Option<T> | Грациозная отмена |
detach() | Синхронный | - | Без отмены |
Трейты Future
Реализация Future
| Метод | Сигнатура | Назначение |
|---|---|---|
poll | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> | Опрос готовности результата |
Output | type Output = T | Тип выходного значения |
Безопасность
- Send: Только если
T: SendиM: Send + Sync - Sync: Только если
M: Send + Sync - UnwindSafe: Всегда безопасно для размотки
- Panic Safety: Отмена безопасна при панике
Структура Timer
Источник: Документация async-io::Timer
#![allow(unused)] fn main() { pub struct Timer { /* private fields */ } }
Фьючерс или поток, который эмитирует события по времени.
Описание
Таймеры - это фьючерсы, которые выводят единственный Instant при срабатывании.
Таймеры также являются потоками, которые могут периодически выводить Instant.
Точность
Существует ограничение на максимальную точность, которую может обеспечить Timer. Этот предел зависит от текущей платформы; например, в Windows максимальная точность составляет около 16 миллисекунд. Из-за этого ограничения таймер может спать дольше запрошенной длительности. Он никогда не будет спать меньше.
Примеры
Ожидание 1 секунды:
#![allow(unused)] fn main() { use async_io::Timer; use std::time::Duration; Timer::after(Duration::from_secs(1)).await; }
Таймаут через 1 секунду:
#![allow(unused)] fn main() { use async_io::Timer; use futures_lite::FutureExt; use std::time::Duration; let addrs = async_net::resolve("google.com:80") .or(async { Timer::after(Duration::from_secs(1)).await; Err(std::io::ErrorKind::TimedOut.into()) }) .await?; }
Методы
Создание таймеров
| Метод | Сигнатура | Описание |
|---|---|---|
never | pub fn never() -> Timer | Создает таймер, который никогда не сработает |
after | pub fn after(duration: Duration) -> Timer | Создает таймер, который сработает один раз через указанную длительность |
at | pub fn at(instant: Instant) -> Timer | Создает таймер, который сработает один раз в указанный момент времени |
interval | pub fn interval(period: Duration) -> Timer | Создает таймер, который срабатывает периодически |
interval_at | pub fn interval_at(start: Instant, period: Duration) -> Timer | Создает таймер, который срабатывает периодически, начиная с start |
Управление таймерами
| Метод | Сигнатура | Описание |
|---|---|---|
will_fire | pub fn will_fire(&self) -> bool | Показывает, сработает ли когда-либо этот таймер |
set_after | pub fn set_after(&mut self, duration: Duration) | Устанавливает таймер на срабатывание один раз через указанную длительность |
set_at | pub fn set_at(&mut self, instant: Instant) | Устанавливает таймер на срабатывание один раз в указанный момент |
set_interval | pub fn set_interval(&mut self, period: Duration) | Устанавливает таймер на периодическое срабатывание |
set_interval_at | pub fn set_interval_at(&mut self, start: Instant, period: Duration) | Устанавливает таймер на периодическое срабатывание, начиная с start |
Примеры использования
Таймер, который никогда не срабатывает
#![allow(unused)] fn main() { use async_io::Timer; use futures_lite::prelude::*; use std::time::Duration; async fn run_with_timeout(timeout: Option<Duration>) { let timer = timeout .map(|timeout| Timer::after(timeout)) .unwrap_or_else(Timer::never); run_lengthy_operation().or(timer).await; } // Таймаут через 5 секунд. run_with_timeout(Some(Duration::from_secs(5))).await; // Без таймаута. run_with_timeout(None).await; }
Интервальный таймер
#![allow(unused)] fn main() { use async_io::Timer; use futures_lite::StreamExt; use std::time::{Duration, Instant}; let period = Duration::from_secs(1); Timer::interval(period).next().await; }
Проверка срабатывания таймера
#![allow(unused)] fn main() { use async_io::Timer; use futures_lite::prelude::*; use std::time::Duration; // `never` никогда не сработает. assert!(!Timer::never().will_fire()); // `after` сработает, если длительность не слишком большая. assert!(Timer::after(Duration::from_secs(1)).will_fire()); assert!(!Timer::after(Duration::MAX).will_fire()); // Однако после срабатывания таймер `after` больше никогда не сработает. let mut t = Timer::after(Duration::from_secs(1)); assert!(t.will_fire()); (&mut t).await; assert!(!t.will_fire()); // Интервальные таймеры срабатывают периодически. let mut t = Timer::interval(Duration::from_secs(1)); assert!(t.will_fire()); t.next().await; assert!(t.will_fire()); }
Реализации трейтов
Future
Таймер реализует Future<Output = Instant>, что позволяет использовать его с await:
#![allow(unused)] fn main() { impl Future for Timer { type Output = Instant; fn poll(self: Pin<&mut Timer>, cx: &mut Context<'_>) -> Poll<Instant>; } }
Stream
Таймер также реализует Stream<Item = Instant> для периодического использования:
#![allow(unused)] fn main() { impl Stream for Timer { type Item = Instant; fn poll_next(self: Pin<&mut Timer>, cx: &mut Context<'_>) -> Poll<Option<Instant>>; fn size_hint(&self) -> (usize, Option<usize>); } }
Другие трейты
Debug- отладочное представлениеDrop- управление временем жизниSend,Sync- безопасность для многопоточностиUnpin- можно перемещать после закрепления
Особенности
- Переиспользование wake-ов: Методы
set_*не удаляют waker связанной задачи - Платформенные ограничения: Точность зависит от ОС
- Гибкость: Может использоваться как фьючерс и как поток
Структура Unblock
Источник: Документация blocking::Unblock
#![allow(unused)] fn main() { pub struct Unblock<T> { /* private fields */ } }
Выполняет блокирующий I/O в пуле потоков.
Описание
Блокирующий I/O должен быть изолирован от асинхронного кода. Этот тип перемещает блокирующие I/O операции в специальный пул потоков, предоставляя при этом знакомый асинхронный интерфейс.
Этот тип реализует трейты Stream, AsyncRead, AsyncWrite или AsyncSeek, если внутренний тип реализует Iterator, Read, Write или Seek соответственно.
Особенности
Unblock является низкоуровневым примитивом и имеет некоторые ограничения:
Для более высокоуровневых примитивов, построенных на основе Unblock, используйте async-fs или async-process (на Windows).
Unblock взаимодействует с I/O операциями в пуле потоков через канал (pipe). Это означает, что асинхронная операция чтения/записи просто получает/отправляет некоторые байты из/в канал. В режиме чтения пул потоков читает байты из I/O дескриптора и пересылает их в канал, пока он не заполнится. В режиме записи пул потоков читает байты из канала и пересылает их в I/O дескриптор.
Используйте Unblock::with_capacity() для настройки емкости канала.
Предупреждения
Чтение
Если вы создадите Unblock<Stdin>, прочитаете из него некоторые байты, а затем удалите его, заблокированная операция чтения может продолжать висеть в пуле потоков. Следующая попытка чтения из stdin потеряет байты, прочитанные висящей операцией. Это сложная проблема для решения, поэтому убедитесь, что вы используете только один дескриптор stdin в течение всей программы.
Запись
Если вы записываете данные через трейт AsyncWrite, убедитесь, что выполнили сброс (flush) перед удалением дескриптора Unblock, иначе некоторые буферизированные данные могут быть потеряны.
Поиск (Seeking)
Из-за буферизации в канале, если Unblock оборачивает File, одна операция чтения может переместить курсор файла дальше, чем диапазон операции. Фактически, чтение просто продолжается в фоновом режиме, пока канал не заполнится. Имейте это в виду при использовании AsyncSeek с относительными смещениями.
Примеры
#![allow(unused)] fn main() { use blocking::Unblock; use futures_lite::prelude::*; let mut stdout = Unblock::new(std::io::stdout()); stdout.write_all(b"Hello world!").await?; stdout.flush().await?; }
Методы
Создание
| Метод | Сигнатура | Описание |
|---|---|---|
new | pub fn new(io: T) -> Unblock<T> | Оборачивает блокирующий I/O дескриптор в асинхронный интерфейс Unblock |
with_capacity | pub fn with_capacity(cap: usize, io: T) -> Unblock<T> | Оборачивает с пользовательской емкостью буфера |
Управление
| Метод | Сигнатура | Описание |
|---|---|---|
get_mut | pub async fn get_mut(&mut self) -> &mut T | Получает изменяемую ссылку на блокирующий I/O дескриптор |
with_mut | pub async fn with_mut<R, F>(&mut self, op: F) -> R | Выполняет блокирующую операцию над I/O дескриптором |
into_inner | pub async fn into_inner(self) -> T | Извлекает внутренний блокирующий I/O дескриптор |
Примеры использования
Базовое использование
#![allow(unused)] fn main() { use blocking::Unblock; let stdin = Unblock::new(std::io::stdin()); }
Настройка емкости буфера
#![allow(unused)] fn main() { use blocking::Unblock; let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout()); }
Работа с дескриптором
#![allow(unused)] fn main() { use blocking::{unblock, Unblock}; use std::fs::File; let file = unblock(|| File::create("file.txt")).await?; let mut file = Unblock::new(file); // Получение ссылки на дескриптор let metadata = file.get_mut().await.metadata()?; // Выполнение операции над дескриптором let metadata = file.with_mut(|f| f.metadata()).await?; // Извлечение дескриптора let file = file.into_inner().await; }
Реализации трейтов
AsyncRead
Реализован для T: Read + Send + 'static:
poll_read()- попытка чтения в буферpoll_read_vectored()- векторизованное чтение
AsyncWrite
Реализован для T: Write + Send + 'static:
poll_write()- попытка записи из буфераpoll_flush()- сброс буферовpoll_close()- закрытие объектаpoll_write_vectored()- векторизованная запись
AsyncSeek
Реализован для T: Seek + Send + 'static:
poll_seek()- перемещение по потоку
Stream
Реализован для T: Iterator + Send + 'static:
poll_next()- получение следующего значенияsize_hint()- информация о размере
Другие трейты
Debug- дляT: DebugSend/Sync- при соответствующих ограничениях наTUnpin- можно перемещать после закрепления
Емкости по умолчанию
- Для типов
Iterator: 8192 элементов - Для типов
Read/Write: 8 МБ
Безопасность
!RefUnwindSafe- не безопасен для размотки через ссылку!UnwindSafe- не безопасен для размоткиFreeze- может быть заморожен
Функции
Функция block_on
Источник: Документация async-io::block_on
#![allow(unused)] fn main() { pub fn block_on<T>(future: impl Future<Output = T>) -> T }
Блокирует текущий поток на фьючерсе, обрабатывая I/O события в простое.
Описание
Функция block_on выполняет асинхронный фьючерс в текущем потоке, блокируя его до завершения фьючерса. Во время выполнения она обрабатывает I/O события, когда поток простаивает.
Эта функция предоставляет простой способ запуска асинхронного кода в синхронном контексте.
Примеры
#![allow(unused)] fn main() { use async_io::Timer; use std::time::Duration; async_io::block_on(async { // Этот таймер, вероятно, будет обработан текущим // потоком, а не резервным потоком "async-io". Timer::after(Duration::from_millis(1)).await; }); }
Особенности реализации
- Обработка I/O: Во время ожидания завершения фьючерса функция обрабатывает события ввода-вывода
- Эффективность: Использует текущий поток для обработки, что может быть более эффективно, чем использование отдельного исполнителя
- Универсальность: Может работать с любым фьючерсом, возвращающим значение типа
T
Типичные сценарии использования
Запуск асинхронного кода в синхронном контексте
use async_io::block_on; fn main() { let result = block_on(async { // Асинхронные операции "Результат асинхронной операции" }); println!("{}", result); }
Ожидание таймаутов
#![allow(unused)] fn main() { use async_io::{block_on, Timer}; use std::time::Duration; block_on(async { Timer::after(Duration::from_secs(1)).await; println!("Прошла 1 секунда"); }); }
Совместное использование с другими асинхронными примитивами
#![allow(unused)] fn main() { use async_io::block_on; use async_net::TcpStream; use std::io; block_on(async { match TcpStream::connect("example.com:80").await { Ok(_) => println!("Подключение успешно"), Err(e) => println!("Ошибка подключения: {}", e), } }); }
Отличие от других исполнителей
В отличие от специализированных исполнителей, block_on:
- Работает в текущем потоке без создания дополнительных потоков
- Обрабатывает I/O события только когда фьючерс ожидает
- Подходит для простых сценариев и тестирования
Ограничения
- Блокирует текущий поток до завершения фьючерса
- Не подходит для длительных операций в GUI-приложениях
- Для сложных сценариев предпочтительнее использовать полноценный исполнитель
Параметры
future: Фьючерс, который нужно выполнить. Должен реализовыватьFuture<Output = T>
Возвращаемое значение
- Возвращает результат выполнения фьючерса типа
T
Функция spawn
Источник: Документация smol::spawn
#![allow(unused)] fn main() { pub fn spawn<T: Send + 'static>( future: impl Future<Output = T> + Send + 'static, ) -> Task<T> }
Запускает задачу в глобальном исполнителе (по умолчанию однопоточном).
Описание
Существует глобальный исполнитель, который лениво инициализируется при первом использовании. Он включен в эту библиотеку для удобства при написании модульных тестов и небольших программ, но в остальных случаях более целесообразно создавать собственный Executor.
По умолчанию глобальный исполнитель запускается одним фоновым потоком, но вы также можете настроить количество потоков, установив переменную окружения SMOL_THREADS.
Поскольку исполнитель сохраняется навсегда, метод drop не вызывается для задач при выходе из программы.
Примеры
#![allow(unused)] fn main() { let task = smol::spawn(async { 1 + 2 }); smol::block_on(async { assert_eq!(task.await, 3); }); }
Особенности
Глобальный исполнитель
- Ленивая инициализация: Создается при первом вызове
spawn - Конфигурация потоков: Настраивается через переменную окружения
SMOL_THREADS - Постоянное существование: Исполнитель не уничтожается при завершении программы
Требования к задачам
T: Send + 'static- результат должен быть передаваемым между потоками и иметь статическое время жизниfuture: Send + 'static- фьючерс должен быть передаваемым и иметь статическое время жизни
Примеры использования
Базовая задача
#![allow(unused)] fn main() { use smol::{spawn, block_on}; let task = spawn(async { println!("Задача выполняется"); 42 }); let result = block_on(async { task.await }); assert_eq!(result, 42); }
Параллельные задачи
#![allow(unused)] fn main() { use smol::{spawn, block_on}; let task1 = spawn(async { 1 + 2 }); let task2 = spawn(async { 3 + 4 }); let (result1, result2) = block_on(async { (task1.await, task2.await) }); assert_eq!(result1, 3); assert_eq!(result2, 7); }
Долгоиграющие задачи
#![allow(unused)] fn main() { use smol::{spawn, Timer}; use std::time::Duration; let task = spawn(async { for i in 0..5 { println!("Итерация {}", i); Timer::after(Duration::from_secs(1)).await; } "Завершено" }); // Задача продолжит выполняться в фоне даже без ожидания }
Настройка через переменные окружения
# Запуск с 4 рабочими потоками
SMOL_THREADS=4 cargo run
# Запуск с 1 потоком (по умолчанию)
SMOL_THREADS=1 cargo run
Рекомендации по использованию
Когда использовать
- Тестирование: Удобно для модульных тестов
- Прототипирование: Быстрое создание прототипов
- Небольшие программы: Простые приложения и утилиты
Когда не использовать
- Производственные приложения: Лучше создавать собственный
Executor - Специфичные требования: При необходимости тонкой настройки
- Длительные процессы: Из-за особенностей управления временем жизни
Альтернативы
Для более сложных сценариев рекомендуется использовать:
#![allow(unused)] fn main() { use smol::Executor; let executor = Executor::new(); let task = executor.spawn(async { // ваша задача }); }
Безопасность
- Send + 'static: Гарантирует безопасность между потоками
- Глобальное состояние: Исполнитель существует всю жизнь программы
- Автоматическое управление: Не требуется ручное уничтожение задач
Функция unblock
Источник: Документация blocking::unblock
#![allow(unused)] fn main() { pub fn unblock<T, F>(f: F) -> Task<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static, }
Выполняет блокирующий код в пуле потоков.
Описание
Функция unblock позволяет выполнять блокирующие операции в асинхронном контексте, перемещая их в специальный пул потоков. Это предотвращает блокировку асинхронного исполнителя длительными операциями.
Примеры
Чтение содержимого файла:
#![allow(unused)] fn main() { use blocking::unblock; use std::fs; let contents = unblock(|| fs::read_to_string("file.txt")).await?; }
Запуск процесса:
#![allow(unused)] fn main() { use blocking::unblock; use std::process::Command; let out = unblock(|| Command::new("dir").output()).await?; }
Особенности
Требования к параметрам
F: FnOnce() -> T + Send + 'static- замыкание должно быть выполняемым один раз, передаваемым между потоками и иметь статическое время жизниT: Send + 'static- результат должен быть передаваемым между потоками и иметь статическое время жизни
Пул потоков
- Выделенные потоки: Использует специальный пул потоков для блокирующих операций
- Автоматическое управление: Потоки управляются автоматически
- Изоляция: Предотвращает блокировку асинхронного исполнителя
Примеры использования
Работа с файловой системой
#![allow(unused)] fn main() { use blocking::unblock; use std::fs; // Чтение файла let data = unblock(|| fs::read("data.bin")).await?; // Запись файла unblock(|| fs::write("output.txt", "Hello world")).await?; // Создание директории unblock(|| fs::create_dir_all("path/to/dir")).await?; }
Вычисления в фоне
#![allow(unused)] fn main() { use blocking::unblock; // Ресурсоемкие вычисления let result = unblock(|| { let mut sum = 0; for i in 0..1_000_000 { sum += i; } sum }).await; println!("Сумма: {}", result); }
Сетевые операции
#![allow(unused)] fn main() { use blocking::unblock; use std::net::TcpStream; use std::io::Write; let response = unblock(|| { let mut stream = TcpStream::connect("example.com:80")?; stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")?; let mut response = String::new(); std::io::Read::read_to_string(&mut stream, &mut response)?; Ok::<String, std::io::Error>(response) }).await?; }
Работа с базами данных
#![allow(unused)] fn main() { use blocking::unblock; let users = unblock(|| { // Блокирующие операции с БД // query_database("SELECT * FROM users") vec!["user1", "user2", "user3"] // заглушка }).await; for user in users { println!("Пользователь: {}", user); } }
Совместное использование с другими асинхронными операциями
#![allow(unused)] fn main() { use blocking::unblock; use smol::Timer; use std::time::Duration; async fn process_data() -> Result<(), Box<dyn std::error::Error>> { // Асинхронное ожидание Timer::after(Duration::from_secs(1)).await; // Блокирующая операция let data = unblock(|| std::fs::read_to_string("data.txt")).await?; // Другая асинхронная операция Timer::after(Duration::from_millis(500)).await; // Еще одна блокирующая операция unblock(|| std::fs::write("processed.txt", data.to_uppercase())).await?; Ok(()) } }
Обработка ошибок
#![allow(unused)] fn main() { use blocking::unblock; let result = unblock(|| -> Result<String, std::io::Error> { std::fs::read_to_string("missing_file.txt") }).await; match result { Ok(contents) => println!("Содержимое: {}", contents), Err(e) => println!("Ошибка: {}", e), } }
Преимущества
- Не блокирует исполнитель: Блокирующие операции выполняются в отдельных потоках
- Простота использования: Знакомый синтаксис с
await - Безопасность: Гарантии типов
Send + 'static - Интеграция: Легко комбинируется с другим асинхронным кодом
Альтернативы
Для длительных операций с состоянием рассмотрите использование Unblock<T>:
#![allow(unused)] fn main() { use blocking::Unblock; use std::fs::File; let file = Unblock::new(File::open("data.txt")?); // Множественные операции с файлом через асинхронный интерфейс }