Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Крейт tokio

Источник

Сайт проекта https://tokio.rs/

Среда выполнения для создания надежных сетевых приложений без компромиссов в скорости.

Tokio — это событийно-ориентированная, неблокирующая платформа ввода-вывода для написания асинхронных приложений на языке Rust. На высоком уровне она предоставляет несколько основных компонентов:

  • Инструменты для работы с асинхронными задачами, включая примитивы синхронизации, каналы, таймауты, задержки и интервалы.
  • API для выполнения асинхронного ввода-вывода, включая TCP- и UDP-сокеты, операции с файловой системой, а также управление процессами и сигналами.
  • Среду выполнения для исполнения асинхронного кода, включая планировщик задач, драйвер ввода-вывода, использующий системные очереди событий (epoll, kqueue, IOCP и т.д.), и высокопроизводительный таймер.

Документация на уровне руководства находится на веб-сайте.

Обзор Tokio

Tokio состоит из ряда модулей, которые предоставляют функциональность, необходимую для реализации асинхронных приложений на Rust. В этом разделе мы кратко рассмотрим Tokio, резюмируя основные API и их назначение.

Самый простой способ начать — включить все функции. Для этого используйте флаг full:

tokio = { version = "1", features = ["full"] }

Создание приложений

Tokio отлично подходит для написания приложений, и большинству пользователей в этом случае не стоит слишком беспокоиться о выборе функций. Если вы не уверены, мы рекомендуем использовать full, чтобы избежать препятствий в процессе разработки.

Пример

Этот пример показывает самый быстрый способ начать работу с Tokio.

tokio = { version = "1", features = ["full"] }

Создание библиотек

Как автор библиотеки, вы должны стремиться предоставить максимально легковесный крейт, основанный на Tokio. Для этого следует включать только те функции, которые вам необходимы. Это позволяет пользователям использовать ваш крейт без необходимости включать лишние функции.

Пример

Этот пример показывает, как можно импортировать функции для библиотеки, которой нужно только tokio::spawn и TcpStream.

tokio = { version = "1", features = ["rt", "net"] }

Работа с задачами

Асинхронные программы в Rust основаны на легковесных, неблокирующих единицах выполнения, называемых задачами. Модуль tokio::task предоставляет важные инструменты для работы с задачами:

  • Функцию spawn и тип JoinHandle для планирования новой задачи в среде выполнения Tokio и ожидания результата выполненной задачи соответственно.
  • Функции для запуска блокирующих операций в контексте асинхронной задачи.

Модуль tokio::task доступен только при включенном флаге функции "rt".

Модуль tokio::sync содержит примитивы синхронизации для обмена данными. К ним относятся:

  • Каналы (oneshot, mpsc, watch и broadcast) для отправки значений между задачами.
  • Неблокирующий Mutex для контроля доступа к разделяемому, изменяемому значению.
  • Асинхронный тип Barrier для синхронизации нескольких задач перед началом вычислений.

Модуль tokio::sync доступен только при включенном флаге функции "sync".

Модуль tokio::time предоставляет утилиты для отслеживания времени и планирования работы. Это включает функции для установки таймаутов для задач, откладывания выполнения на будущее или повторения операции через интервалы.

Для использования tokio::time должен быть включен флаг функции "time".

Наконец, Tokio предоставляет среду выполнения для исполнения асинхронных задач. Большинство приложений могут использовать макрос #[tokio::main] для запуска своего кода в среде выполнения Tokio. Однако этот макрос предоставляет только базовые опции конфигурации. В качестве альтернативы, модуль tokio::runtime предоставляет более мощные API для настройки и управления средами выполнения. Вам следует использовать этот модуль, если макрос #[tokio::main] не предоставляет нужной вам функциональности.

Использование среды выполнения требует флагов функций "rt" или "rt-multi-thread" для включения однопоточного планировщика для текущего потока и многопоточного планировщика соответственно. Подробности см. в документации к модулю runtime. Кроме того, флаг функции "macros" включает атрибуты #[tokio::main] и #[tokio::test].

CPU-ограниченные задачи и блокирующий код

Tokio может параллельно выполнять множество задач в нескольких потоках, постоянно переключая выполняемую в данный момент задачу в каждом потоке. Однако такая перестановка может происходить только в точках .await, поэтому код, который долгое время работает без достижения .await, будет блокировать выполнение других задач. Для решения этой проблемы Tokio предоставляет два вида потоков: Основные потоки и Блокирующие потоки.

Основные потоки — это место, где выполняется весь асинхронный код, и по умолчанию Tokio создает по одному такому потоку на каждое ядро CPU. Вы можете использовать переменную окружения TOKIO_WORKER_THREADS для переопределения значения по умолчанию.

Блокирующие потоки создаются по требованию, могут использоваться для запуска блокирующего кода, который в противном случае блокировал бы выполнение других задач, и сохраняются alive в течение некоторого времени бездействия, которое можно настроить с помощью thread_keep_alive. Поскольку Tokio не может вытеснять блокирующие задачи, как это можно делать с асинхронным кодом, верхний предел количества блокирующих потоков очень велик. Эти ограничения можно настроить в Builder.

Для порождения блокирующей задачи следует использовать функцию spawn_blocking.

#[tokio::main]
async fn main() {
    // Этот код выполняется в основном потоке.

    let blocking_task = tokio::task::spawn_blocking(|| {
        // Этот код выполняется в блокирующем потоке.
        // Здесь блокировка допустима.
    });

    // Мы можем ждать завершения блокирующей задачи так:
    // Если блокирующая задача запаникует, unwrap ниже распространит панику.
    blocking_task.await.unwrap();
}

Если ваш код ограничен производительностью CPU и вы хотите ограничить количество потоков для его выполнения, вам следует использовать отдельный пул потоков, предназначенный для CPU-ограниченных задач. Например, вы можете рассмотреть возможность использования библиотеки rayon для таких задач. Также можно создать дополнительную среду выполнения Tokio, предназначенную для CPU-ограниченных задач, но в этом случае следует убедиться, что эта среда выполнения выполняет только CPU-ограниченные задачи, так как задачи, ограниченные вводом-выводом, в этой среде будут работать плохо.

Подсказка: При использовании rayon вы можете использовать канал oneshot для отправки результата обратно в Tokio по завершении задачи rayon.

Асинхронный ввод-вывод

Помимо планирования и выполнения задач, Tokio предоставляет все необходимое для асинхронного выполнения ввода и вывода.

Модуль tokio::io предоставляет основные асинхронные примитивы ввода-вывода Tokio: трейты AsyncRead, AsyncWrite и AsyncBufRead. Кроме того, при включенном флаге функции "io-util" он также предоставляет комбинаторы и функции для работы с этими трейтами, образуя асинхронный аналог std::io.

Tokio также включает API для выполнения различных видов ввода-вывода и асинхронного взаимодействия с операционной системой. К ним относятся:

  • tokio::net — содержит неблокирующие версии TCP, UDP и Unix Domain Sockets (включается флагом "net").
  • tokio::fs — аналогичен std::fs, но для асинхронного выполнения операций с файловой системой (включается флагом "fs").
  • tokio::signal — для асинхронной обработки сигналов ОС Unix и Windows (включается флагом "signal").
  • tokio::process — для порождения и управления дочерними процессами (включается флагом "process").

Примеры

Простой TCP-эхо сервер:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // В цикле читаем данные из сокета и записываем их обратно.
            loop {
                let n = match socket.read(&mut buf).await {
                    // сокет закрыт
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("не удалось прочитать из сокета; err = {:?}", e);
                        return;
                    }
                };

                // Записываем данные обратно
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("не удалось записать в сокет; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

Флаги функций

Tokio использует набор флагов функций для уменьшения количества компилируемого кода. Можно включать только определенные функции. По умолчанию Tokio не включает никакие функции, но позволяет включить их подмножество для конкретного случая использования. Ниже приведен список доступных флагов функций. Вы также можете заметить, что над каждой функцией, структурой и трейтом указан один или несколько флагов функций, необходимых для использования этого элемента. Если вы новичок в Tokio, рекомендуется использовать флаг full, который включает все публичные API. Однако имейте в виду, что это потянет за собой множество дополнительных зависимостей, которые могут вам не понадобиться.

  • full: Включает все функции, перечисленные ниже, кроме test-util и tracing.
  • rt: Включает tokio::spawn, планировщик для текущего потока и утилиты, не связанные с планировщиком.
  • rt-multi-thread: Включает более тяжелый многопоточный планировщик с work-stealing.
  • io-util: Включает трейты-расширения на основе ввода-вывода.
  • io-std: Включает типы Stdout, Stdin и Stderr.
  • net: Включает типы tokio::net, такие как TcpStream, UnixStream и UdpSocket, а также (в Unix-подобных системах) AsyncFd и (в FreeBSD) PollAio.
  • time: Включает типы tokio::time и позволяет планировщикам включить встроенный таймер.
  • process: Включает типы tokio::process.
  • macros: Включает макросы #[tokio::main] и #[tokio::test].
  • sync: Включает все типы tokio::sync.
  • signal: Включает все типы tokio::signal.
  • fs: Включает типы tokio::fs.
  • test-util: Включает инфраструктуру для тестирования среды выполнения Tokio.
  • parking_lot: В качестве потенциальной оптимизации использует примитивы синхронизации из крейта parking_lot внутри. Также эта зависимость необходима для создания некоторых наших примитивов в контексте const. MSRV может увеличиваться в соответствии с используемой версией parking_lot.

Примечание: Трейты AsyncRead и AsyncWrite не требуют никаких флагов и всегда доступны.

Нестабильные функции

Некоторые флаги функций доступны только при указании флага tokio_unstable:

  • tracing: Включает события tracing.

Аналогично, некоторые части API доступны только с тем же флагом:

  • task::Builder
  • Некоторые методы в task::JoinSet
  • runtime::RuntimeMetrics
  • runtime::Builder::on_task_spawn
  • runtime::Builder::on_task_terminate
  • runtime::Builder::unhandled_panic
  • runtime::TaskMeta

Этот флаг включает нестабильные функции. Их публичный API может нарушать семантическое версионирование в релизах 1.x. Чтобы включить эти функции, аргумент --cfg tokio_unstable должен быть передан в rustc при компиляции. Это позволяет явно согласиться на использование функций, которые могут нарушать соглашения semver, поскольку Cargo пока не поддерживает такие соглашения напрямую.

Вы можете указать это в файле .cargo/config.toml вашего проекта:

[build]
rustflags = ["--cfg", "tokio_unstable"]

Раздел [build] указывается не в файле Cargo.toml, а в конфигурационном файле Cargo .cargo/config.toml.

Альтернативно, вы можете указать это с помощью переменной окружения:

# Многие *nix-оболочки:
export RUSTFLAGS="--cfg tokio_unstable"
cargo build

# Windows PowerShell:
$Env:RUSTFLAGS="--cfg tokio_unstable"
cargo build

Поддерживаемые платформы

Tokio в настоящее время гарантированно поддерживает следующие платформы:

  • Linux
  • Windows
  • Android (уровень API 21)
  • macOS
  • iOS
  • FreeBSD

Tokio будет продолжать поддерживать эти платформы в будущем. Однако будущие релизы могут изменить требования, такие как минимальная требуемая версия libc в Linux, уровень API в Android или поддерживаемая версия FreeBSD.

Помимо указанных выше платформ, Tokio предназначен для работы на всех платформах, поддерживаемых крейтом mio. Более длинный список можно найти в документации mio. Однако эти дополнительные платформы могут стать неподдерживаемыми в будущем.

Обратите внимание, что Wine считается платформой, отличной от Windows. Подробнее о поддержке Wine см. в документации mio.

Поддержка WASM

Tokio имеет ограниченную поддержку платформы WASM. Без флага tokio_unstable поддерживаются следующие функции:

  • sync
  • macros
  • io-util
  • rt
  • time

Включение любой другой функции (включая full) приведет к ошибке компиляции.

Модуль time будет работать только на WASM-платформах, которые поддерживают таймеры (например, wasm32-wasi). Функции времени вызовут панику, если используются на WASM-платформе без поддержки таймеров.

Также обратите внимание, что если среда выполнения становится бесконечно бездействующей, она немедленно запаникует вместо вечной блокировки. На платформах, которые не поддерживают время, это означает, что среда выполнения никогда не может простаивать.

Нестабильная поддержка WASM

Tokio также имеет нестабильную поддержку некоторых дополнительных WASM-функций. Это требует использования флага tokio_unstable.

Использование этого флага позволяет использовать tokio::net для цели wasm32-wasi. Однако не все методы доступны для сетевых типов, поскольку WASI в настоящее время не поддерживает создание новых сокетов изнутри WASM. Из-за этого сокеты в настоящее время должны создаваться с помощью трейта FromRawFd.

Реэкспорты

#![allow(unused)]
fn main() {
pub use task::spawn; // rt
}

Модули

ИмяФлагиОписание
fsfsАсинхронные файловые утилиты.
ioТрейты, вспомогательные средства и определения типов для асинхронной функциональности ввода-вывода.
netnetПривязки TCP/UDP/Unix для tokio.
processprocessРеализация асинхронного управления процессами для Tokio.
runtimertСреда выполнения Tokio.
signalsignalАсинхронная обработка сигналов для Tokio.
syncsyncПримитивы синхронизации для использования в асинхронных контекстах.
taskrtАсинхронные "зеленые потоки" (задачи).
timetimeУтилиты для отслеживания времени.

Макросы

ИмяФлагиОписание
joinmacrosОжидает завершения нескольких параллельных ветвей.
pinЗакрепляет значение в стеке.
selectmacrosОжидает завершения первой из нескольких параллельных ветвей, отменяя остальные.
task_localrtОбъявляет новый задачно-локальный ключ типа tokio::task::LocalKey.
try_joinmacrosОжидает успешного завершения всех параллельных ветвей или первой ошибки.

Атрибут-макросы

ИмяФлагиОписание
mainrt и macrosПомечает асинхронную функцию для выполнения выбранной средой выполнения. Помогает настроить Runtime без прямого использования Runtime или Builder.
testrt и macrosПомечает асинхронную функцию для выполнения средой выполнения, подходящей для тестов. Помогает настроить Runtime без прямого использования Runtime или Builder.

Макросы

Макрос join

#![allow(unused)]
fn main() {
macro_rules! join {
    ($(biased;)? $($future:expr),*) => { ... };
}
}

Доступно только с флагом функции macros.

Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда все ветви завершатся.

Описание

Макрос join! должен использоваться внутри асинхронных функций, замыканий и блоков.

Макрос join! принимает список асинхронных выражений и выполняет их конкурентно в одной и той же задаче. Каждое асинхронное выражение вычисляется в future, и future из каждого выражения мультиплексируются в текущей задаче.

При работе с асинхронными выражениями, возвращающими Result, join! будет ждать завершения всех ветвей независимо от того, завершилась ли какая-либо из них с Err. Используйте try_join! для досрочного возврата при обнаружении Err.

Примечания

Предоставленные future хранятся inline и не требуют выделения Vec.

Характеристики времени выполнения

Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в join!.

Честность (Fairness)

По умолчанию сгенерированный future макроса join! чередует порядок опроса содержащихся future при каждом пробуждении.

Это поведение можно переопределить, добавив biased; в начало использования макроса. Подробности см. в примерах. Это заставит join опрашивать future в порядке их следования сверху вниз.

Это может быть полезно, если ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение.

Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы объединяете поток и future завершения, и поток имеет огромный объем сообщений, обработка которых занимает много времени за один опрос, вам следует поместить future завершения раньше в списке join!, чтобы гарантировать, что он всегда будет опрашиваться и не будет задерживаться из-за того, что future потока долго возвращает Poll::Pending.

Примеры

Базовое объединение с двумя ветвями

#![allow(unused)]
fn main() {
async fn do_stuff_async() {
    // асинхронная работа
}

async fn more_async_work() {
    // больше асинхронной работы
}

let (first, second) = tokio::join!(
    do_stuff_async(),
    more_async_work()
);

// делаем что-то со значениями
}

Использование режима biased; для контроля порядка опроса

#![allow(unused)]
fn main() {
async fn do_stuff_async() {
    // асинхронная работа
}

async fn more_async_work() {
    // больше асинхронной работы
}

let (first, second) = tokio::join!(
    biased;
    do_stuff_async(),
    more_async_work()
);

// делаем что-то со значениями
}

Обработка результатов с ошибками

#![allow(unused)]
fn main() {
async fn fetch_data() -> Result<String, reqwest::Error> {
    // получение данных
}

async fn process_file() -> Result<Vec<u8>, std::io::Error> {
    // обработка файла
}

// join! будет ждать завершения обеих ветвей, даже если одна вернет Err
let (data_result, file_result) = tokio::join!(
    fetch_data(),
    process_file()
);

// Обрабатываем результаты независимо
match data_result {
    Ok(data) => println!("Данные получены: {}", data),
    Err(e) => println!("Ошибка получения данных: {}", e),
}

match file_result {
    Ok(file) => println!("Файл обработан, размер: {} байт", file.len()),
    Err(e) => println!("Ошибка обработки файла: {}", e),
}
}

Использование с разными типами возвращаемых значений

#![allow(unused)]
fn main() {
async fn get_number() -> i32 { 42 }
async fn get_text() -> &'static str { "hello" }
async fn get_vector() -> Vec<u8> { vec![1, 2, 3] }

let (number, text, vector) = tokio::join!(
    get_number(),
    get_text(),
    get_vector()
);

println!("Number: {}, Text: {}, Vector: {:?}", number, text, vector);
}

Макрос select

#![allow(unused)]
fn main() {
macro_rules! select {
    {
        $(
            biased;
        )?
        $(
            $bind:pat = $fut:expr $(, if $cond:expr)? => $handler:expr,
        )*
        $(
            else => $els:expr $(,)?
        )?
    } => { ... };
}
}

Доступно только с флагом функции macros.

Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда первая ветвь завершается, отменяя оставшиеся ветви.

Описание

Макрос select! должен использоваться внутри асинхронных функций, замыканий и блоков.

Макрос select! принимает одну или несколько ветвей со следующим шаблоном:

<шаблон> = <асинхронное выражение> (, if <предусловие>)? => <обработчик>,

Дополнительно макрос select! может включать единственную необязательную ветвь else, которая выполняется, если ни одна из других ветвей не соответствует своим шаблонам:

else => <выражение>

Макрос агрегирует все выражения <асинхронное выражение> и выполняет их конкурентно в текущей задаче. Как только первое выражение завершается со значением, соответствующим его <шаблону>, макрос select! возвращает результат вычисления выражения <обработчик> завершенной ветви.

Кроме того, каждая ветвь может включать необязательное предусловие if. Если предусловие возвращает false, то ветвь отключается. Предоставленное <асинхронное выражение> все равно вычисляется, но результирующий future никогда не опрашивается. Эта возможность полезна при использовании select! в цикле.

Полный жизненный цикл выражения select! выглядит следующим образом:

  1. Вычислить все предоставленные выражения <предусловие>. Если предусловие возвращает false, отключить ветвь до конца текущего вызова select!. Повторный вход в select! из-за цикла сбрасывает состояние "отключено".
  2. Агрегировать <асинхронное выражение> из каждой ветви, включая отключенные. Если ветвь отключена, <асинхронное выражение> все равно вычисляется, но результирующий future не опрашивается.
  3. Если все ветви отключены: перейти к шагу 6.
  4. Конкурентно ожидать результаты всех оставшихся <асинхронное выражение>.
  5. Как только <асинхронное выражение> возвращает значение, попытаться применить значение к предоставленному <шаблону>. Если шаблон совпадает, вычислить <обработчик> и вернуть результат. Если шаблон не совпадает, отключить текущую ветвь до конца текущего вызова select!. Продолжить с шага 3.
  6. Вычислить выражение else. Если выражение else не предоставлено, вызвать панику.

Характеристики времени выполнения

Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в select!.

Честность (Fairness)

По умолчанию select! случайным образом выбирает ветвь для проверки первой. Это обеспечивает некоторый уровень честности при вызове select! в цикле с ветвями, которые всегда готовы.

Это поведение можно переопределить, добавив biased; в начало использования макроса. Это заставит select опрашивать future в порядке их следования сверху вниз. Есть несколько причин, по которым это может понадобиться:

  • Генерация случайных чисел в tokio::select! имеет ненулевую стоимость CPU
  • Ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение

Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы выбираете между потоком и future завершения, и поток имеет огромный объем сообщений и нулевое или почти нулевое время между ними, вам следует поместить future завершения раньше в списке select!, чтобы гарантировать, что он всегда будет опрашиваться и не будет проигнорирован из-за того, что поток постоянно готов.

Паника

Макрос select! вызывает панику, если все ветви отключены и нет предоставленной ветви else. Ветвь отключается, когда предоставленное предусловие if возвращает false или когда шаблон не соответствует результату <асинхронное выражение>.

Безопасность отмены (Cancellation Safety)

При использовании select! в цикле для получения сообщений из нескольких источников следует убедиться, что вызов получения безопасен при отмене, чтобы избежать потери сообщений. В этом разделе рассматриваются различные распространенные методы и описывается, являются ли они безопасными при отмене. Списки в этом разделе не являются исчерпывающими.

Следующие методы безопасны при отмене:

  • tokio::sync::mpsc::Receiver::recv
  • tokio::sync::mpsc::UnboundedReceiver::recv
  • tokio::sync::broadcast::Receiver::recv
  • tokio::sync::watch::Receiver::changed
  • tokio::net::TcpListener::accept
  • tokio::net::UnixListener::accept
  • tokio::signal::unix::Signal::recv
  • tokio::io::AsyncReadExt::read на любом AsyncRead
  • tokio::io::AsyncReadExt::read_buf на любом AsyncRead
  • tokio::io::AsyncWriteExt::write на любом AsyncWrite
  • tokio::io::AsyncWriteExt::write_buf на любом AsyncWrite
  • tokio_stream::StreamExt::next на любом Stream
  • futures::stream::StreamExt::next на любом Stream

Следующие методы не безопасны при отмене и могут привести к потере данных:

  • tokio::io::AsyncReadExt::read_exact
  • tokio::io::AsyncReadExt::read_to_end
  • tokio::io::AsyncReadExt::read_to_string
  • tokio::io::AsyncWriteExt::write_all

Следующие методы не безопасны при отмене, потому что они используют очередь для честности, и отмена заставляет вас потерять свое место в очереди:

  • tokio::sync::Mutex::lock
  • tokio::sync::RwLock::read
  • tokio::sync::RwLock::write
  • tokio::sync::Semaphore::acquire
  • tokio::sync::Notify::notified

Чтобы определить, являются ли ваши собственные методы безопасными при отмене, ищите места использования .await. Это связано с тем, что когда асинхронный метод отменяется, это всегда происходит в .await. Если ваша функция ведет себя корректно, даже если она перезапускается во время ожидания в .await, то она безопасна при отмене.

Безопасность отмены можно определить следующим образом: если у вас есть future, который еще не завершился, то удаление этого future и его повторное создание должно быть no-op операцией. Это определение мотивировано ситуацией, когда select! используется в цикле. Без этой гарантии вы потеряете свой прогресс, когда другая ветвь завершится и вы перезапустите select!, пройдя по циклу.

Имейте в виду, что отмена чего-либо, что не является безопасным при отмене, не обязательно является ошибкой. Например, если вы отменяете задачу, потому что приложение завершает работу, то вам, вероятно, все равно, что частично прочитанные данные будут потеряны.

Примеры

Базовый select с двумя ветвями

#![allow(unused)]
fn main() {
async fn do_stuff_async() {
    // асинхронная работа
}

async fn more_async_work() {
    // больше асинхронной работы
}

tokio::select! {
    _ = do_stuff_async() => {
        println!("do_stuff_async() завершился первым")
    }
    _ = more_async_work() => {
        println!("more_async_work() завершился первым")
    }
};
}

Базовый выбор между потоками

#![allow(unused)]
fn main() {
use tokio_stream::{self as stream, StreamExt};

let mut stream1 = stream::iter(vec![1, 2, 3]);
let mut stream2 = stream::iter(vec![4, 5, 6]);

let next = tokio::select! {
    v = stream1.next() => v.unwrap(),
    v = stream2.next() => v.unwrap(),
};

assert!(next == 1 || next == 4);
}

Сбор содержимого двух потоков

#![allow(unused)]
fn main() {
use tokio_stream::{self as stream, StreamExt};

let mut stream1 = stream::iter(vec![1, 2, 3]);
let mut stream2 = stream::iter(vec![4, 5, 6]);

let mut values = vec![];

loop {
    tokio::select! {
        Some(v) = stream1.next() => values.push(v),
        Some(v) = stream2.next() => values.push(v),
        else => break,
    }
}

values.sort();
assert_eq!(&[1, 2, 3, 4, 5, 6], &values[..]);
}

Использование с таймаутом

#![allow(unused)]
fn main() {
use tokio_stream::{self as stream, StreamExt};
use tokio::time::{self, Duration};

let mut stream = stream::iter(vec![1, 2, 3]);
let sleep = time::sleep(Duration::from_secs(1));
tokio::pin!(sleep);

loop {
    tokio::select! {
        maybe_v = stream.next() => {
            if let Some(v) = maybe_v {
                println!("получено = {}", v);
            } else {
                break;
            }
        }
        _ = &mut sleep => {
            println!("таймаут");
            break;
        }
    }
}
}

Объединение двух значений с использованием select!

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

let (tx1, mut rx1) = oneshot::channel();
let (tx2, mut rx2) = oneshot::channel();

tokio::spawn(async move {
    tx1.send("first").unwrap();
});

tokio::spawn(async move {
    tx2.send("second").unwrap();
});

let mut a = None;
let mut b = None;

while a.is_none() || b.is_none() {
    tokio::select! {
        v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()),
        v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()),
    }
}

let res = (a.unwrap(), b.unwrap());
assert_eq!(res.0, "first");
assert_eq!(res.1, "second");
}

Использование режима biased; для контроля порядка опроса

#![allow(unused)]
fn main() {
let mut count = 0u8;

loop {
    tokio::select! {
        // Если запустить этот пример без `biased;`, порядок опроса
        // псевдослучайный, и проверки значения count (вероятно) провалятся.
        biased;

        _ = async {}, if count < 1 => {
            count += 1;
            assert_eq!(count, 1);
        }
        _ = async {}, if count < 2 => {
            count += 1;
            assert_eq!(count, 2);
        }
        _ = async {}, if count < 3 => {
            count += 1;
            assert_eq!(count, 3);
        }
        _ = async {}, if count < 4 => {
            count += 1;
            assert_eq!(count, 4);
        }
        else => {
            break;
        }
    };
}
}

Избегание гонок в предусловиях if

Поскольку предусловия if используются для отключения ветвей select!, следует проявлять осторожность, чтобы избежать пропуска значений.

Например, вот неправильное использование sleep с if. Цель - повторно выполнять асинхронную задачу до 50 миллисекунд. Однако существует вероятность пропуска завершения сна. ⓘ

#![allow(unused)]
fn main() {
use tokio::time::{self, Duration};

async fn some_async_work() {
    // выполняем работу
}

let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);

while !sleep.is_elapsed() {
    tokio::select! {
        _ = &mut sleep, if !sleep.is_elapsed() => {
            println!("операция превысила время ожидания");
        }
        _ = some_async_work() => {
            println!("операция завершена");
        }
    }
}
}

В приведенном выше примере sleep.is_elapsed() может возвращать true, даже если sleep.poll() никогда не возвращал Ready. Это создает потенциальное состояние гонки, когда sleep истекает между проверкой while !sleep.is_elapsed() и вызовом select!, в результате чего вызов some_async_work() выполняется беспрепятственно, несмотря на истечение времени сна.

Один из способов написать приведенный выше пример без гонки:

#![allow(unused)]
fn main() {
use tokio::time::{self, Duration};

async fn some_async_work() {
    // выполняем работу
}

let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);

loop {
    tokio::select! {
        _ = &mut sleep => {
            println!("операция превысила время ожидания");
            break;
        }
        _ = some_async_work() => {
            println!("операция завершена");
        }
    }
}
}

Альтернативы из экосистемы

Макрос select! - это мощный инструмент для управления несколькими асинхронными ветвями, позволяющий задачам выполняться конкурентно в одном потоке. Однако его использование может создавать сложности, особенно связанные с безопасностью отмены, что может привести к трудноуловимым и сложным для отладки ошибкам. Для многих случаев использования альтернативы из экосистемы могут быть предпочтительнее, поскольку они смягчают эти проблемы, предлагая более четкий синтаксис, более предсказуемое управление потоком и уменьшая необходимость ручного решения таких проблем, как семантика предохранителей или безопасность отмены.

Объединение потоков (Merging Streams)

Для случаев, когда loop { select! { ... } } используется для опроса нескольких задач, объединение потоков предлагает краткую альтернативу, которая по своей природе обрабатывает безопасную при отмене обработку, устраняя риск потери данных. Библиотеки, такие как tokio_stream, futures::stream и futures_concurrency, предоставляют инструменты для объединения потоков и последовательной обработки их выходных данных.

Гонка Future (Racing Futures)

Если вам нужно дождаться первого завершения среди нескольких асинхронных задач, утилиты экосистемы, такие как futures, futures-lite или futures-concurrency, предоставляют упрощенный синтаксис для гонки future:

  • futures_concurrency::future::Race
  • futures::select
  • futures::stream::select_all (для потоков)
  • futures_lite::future::or
  • futures_lite::future::race
#![allow(unused)]
fn main() {
use futures_concurrency::future::Race;

let task_a = async { Ok("ok") };
let task_b = async { Err("error") };
let result = (task_a, task_b).race().await;

match result {
    Ok(output) => println!("Первая задача завершилась с: {output}"),
    Err(err) => eprintln!("Произошла ошибка: {err}"),
}
}

Macro task_local

#![allow(unused)]
fn main() {
macro_rules! task_local {
    () => { ... };
    ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { ... };
    ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { ... };
}
}

Доступно только в crate feature rt.

Объявляет новый локальный ключ типа tokio::task::LocalKey

Синтакс

Макрос оборачивает любое количество статических объявлений и делает их локальными для текущей задачи. Публичность и атрибуты для каждого выражения сохраняются. Например:

Пример

#![allow(unused)]
fn main() {
task_local! {
    pub static ONE: u32;

    #[allow(unused)]
    static TWO: f32;
}
}

См. LocalKey документацию для большей информации.

Макрос pin

#![allow(unused)]
fn main() {
macro_rules! pin {
    ($($x:ident),*) => { ... };
    ($(
            let $x:ident = $init:expr;
    )*) => { ... };
}
}

Закрепляет значение в стеке.

Описание

Вызовы async fn возвращают анонимные значения Future, которые являются !Unpin. Эти значения должны быть закреплены до того, как их можно будет опросить. Вызов .await обработает это, но потребляет future. Если требуется вызвать .await на ссылке &mut _, вызывающая сторона отвечает за закрепление future.

Закрепление может быть выполнено путем выделения памяти с помощью Box::pin или использования стека с помощью макроса pin!.

Следующий код не скомпилируется: ⓘ

async fn my_async_fn() {
    // асинхронная логика здесь
}

#[tokio::main]
async fn main() {
    let mut future = my_async_fn();
    (&mut future).await; // Ошибка компиляции!
}

Чтобы это работало, требуется закрепление:

#![allow(unused)]
fn main() {
use tokio::pin;

async fn my_async_fn() {
    // асинхронная логика здесь
}

let future = my_async_fn();
pin!(future);

(&mut future).await; // Теперь работает
}

Закрепление полезно при использовании select! и операторов потоков, которые требуют T: Stream + Unpin.

Использование

Макрос pin! принимает идентификаторы в качестве аргументов. Он не работает с выражениями.

Следующее не компилируется, так как выражение передается в pin!: ⓘ

async fn my_async_fn() {
    // асинхронная логика здесь
}

#[tokio::main]
async fn main() {
    let mut future = pin!(my_async_fn()); // Ошибка: ожидается идентификатор
    (&mut future).await;
}

Примеры

Использование с select!

#![allow(unused)]
fn main() {
use tokio::{pin, select};
use tokio_stream::{self as stream, StreamExt};

async fn my_async_fn() {
    // асинхронная логика здесь
}

let mut stream = stream::iter(vec![1, 2, 3, 4]);

let future = my_async_fn();
pin!(future);

loop {
    select! {
        _ = &mut future => {
            // Прекращаем цикл после завершения future
            break;
        }
        Some(val) = stream.next() => {
            println!("получено значение = {}", val);
        }
    }
}
}

Одновременное объявление переменной и закрепление

Поскольку присваивание переменной с последующим закреплением является распространенной операцией, существует вариант макроса, который поддерживает выполнение обоих действий за один раз.

#![allow(unused)]
fn main() {
use tokio::{pin, select};

async fn my_async_fn() {
    // асинхронная логика здесь
}

pin! {
    let future1 = my_async_fn();
    let future2 = my_async_fn();
}

select! {
    _ = &mut future1 => {}
    _ = &mut future2 => {}
}
}

Закрепление нескольких значений

#![allow(unused)]
fn main() {
use tokio::pin;

async fn task_one() -> i32 { 42 }
async fn task_two() -> &'static str { "hello" }

let fut1 = task_one();
let fut2 = task_two();

pin!(fut1, fut2);

// Теперь можно использовать &mut fut1 и &mut fut2 с .await
}

Практический пример с обработкой ошибок

use tokio::{pin, select};
use std::time::Duration;

async fn network_request() -> Result<String, &'static str> {
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok("данные получены".to_string())
}

async fn timeout() -> Result<(), &'static str> {
    tokio::time::sleep(Duration::from_secs(2)).await;
    Err("таймаут")
}

#[tokio::main]
async fn main() {
    let request_future = network_request();
    let timeout_future = timeout();
    
    pin! {
        let request = request_future;
        let timeout = timeout_future;
    }
    
    select! {
        result = &mut request => {
            match result {
                Ok(data) => println!("Успех: {}", data),
                Err(e) => println!("Ошибка запроса: {}", e),
            }
        }
        _ = &mut timeout => {
            println!("Превышено время ожидания запроса");
        }
    }
}

Примечания

  • Закрепление в стеке более эффективно, чем использование Box::pin, так как избегает выделения памяти в куче
  • Закрепленные значения могут быть безопасно разыменованы для получения &mut T только через специальные интерфейсы
  • Макрос pin! гарантирует, что закрепленное значение не может быть перемещено после закрепления

Макрос try_join

#![allow(unused)]
fn main() {
macro_rules! try_join {
    ($(biased;)? $($future:expr),*) => { ... };
}
}

Доступно только с флагом функции macros.

Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда все ветви завершаются с Ok(_) или при первой Err(_).

Описание

Макрос try_join! должен использоваться внутри асинхронных функций, замыканий и блоков.

Аналогично join!, макрос try_join! принимает список асинхронных выражений и выполняет их конкурентно в одной и той же задаче. Каждое асинхронное выражение вычисляется в future, и future из каждого выражения мультиплексируются в текущей задаче. Макрос try_join! возвращает результат, когда все ветви возвращают Ok или когда первая ветвь возвращает Err.

Примечания

Предоставленные future хранятся inline и не требуют выделения Vec.

Характеристики времени выполнения

Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в try_join!.

Честность (Fairness)

По умолчанию сгенерированный future макроса try_join! чередует порядок опроса содержащихся future при каждом пробуждении.

Это поведение можно переопределить, добавив biased; в начало использования макроса. Это заставит try_join опрашивать future в порядке их следования сверху вниз.

Это может быть полезно, если ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение.

Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы объединяете поток и future завершения, и поток имеет огромный объем сообщений, обработка которых занимает много времени за один опрос, вам следует поместить future завершения раньше в списке try_join!, чтобы гарантировать, что он всегда будет опрашиваться и не будет задерживаться из-за того, что future потока долго возвращает Poll::Pending.

Примеры

Базовый try_join с двумя ветвями

#![allow(unused)]
fn main() {
async fn do_stuff_async() -> Result<(), &'static str> {
    // асинхронная работа
    Ok(())
}

async fn more_async_work() -> Result<(), &'static str> {
    // больше асинхронной работы
    Ok(())
}

let res = tokio::try_join!(
    do_stuff_async(),
    more_async_work()
);

match res {
    Ok((first, second)) => {
        // делаем что-то со значениями
        println!("Обе операции завершились успешно");
    }
    Err(err) => {
        println!("Обработка завершилась ошибкой; ошибка = {}", err);
    }
}
}

Использование try_join! с порожденными задачами

#![allow(unused)]
fn main() {
use tokio::task::JoinHandle;

async fn do_stuff_async() -> Result<String, &'static str> {
    // асинхронная работа
    Ok("результат".to_string())
}

async fn more_async_work() -> Result<i32, &'static str> {
    // больше асинхронной работы
    Ok(42)
}

async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> {
    match handle.await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(err)) => Err(err),
        Err(err) => Err("обработка задачи завершилась ошибкой"),
    }
}

let handle1 = tokio::spawn(do_stuff_async());
let handle2 = tokio::spawn(more_async_work());

match tokio::try_join!(flatten(handle1), flatten(handle2)) {
    Ok((text, number)) => {
        println!("Успех: текст = {}, число = {}", text, number);
    }
    Err(err) => {
        println!("Не удалось выполнить с ошибкой: {}.", err);
    }
}
}

Использование режима biased; для контроля порядка опроса

#![allow(unused)]
fn main() {
async fn do_stuff_async() -> Result<(), &'static str> {
    // асинхронная работа
    Ok(())
}

async fn more_async_work() -> Result<(), &'static str> {
    // больше асинхронной работы
    Ok(())
}

let res = tokio::try_join!(
    biased;
    do_stuff_async(),
    more_async_work()
);

match res {
    Ok((first, second)) => {
        // делаем что-то со значениями
    }
    Err(err) => {
        println!("обработка завершилась ошибкой; ошибка = {}", err);
    }
}
}

Практический пример с различными типами результатов

#![allow(unused)]
fn main() {
async fn fetch_user() -> Result<String, &'static str> {
    // имитация запроса к API
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    Ok("Иван Иванов".to_string())
}

async fn fetch_balance() -> Result<f64, &'static str> {
    // имитация запроса к базе данных
    tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
    Ok(1000.50)
}

async fn validate_session() -> Result<bool, &'static str> {
    // проверка сессии
    Ok(true)
}

let result = tokio::try_join!(
    fetch_user(),
    fetch_balance(),
    validate_session()
);

match result {
    Ok((user, balance, is_valid)) => {
        println!("Пользователь: {}, Баланс: {:.2}, Сессия действительна: {}", 
                 user, balance, is_valid);
    }
    Err(err) => {
        eprintln!("Ошибка при загрузке данных: {}", err);
    }
}
}

Обработка ошибок с досрочным завершением

#![allow(unused)]
fn main() {
async fn process_step1() -> Result<String, &'static str> {
    // шаг 1 обработки
    Ok("шаг1 завершен".to_string())
}

async fn process_step2() -> Result<i32, &'static str> {
    // шаг 2 обработки - имитация ошибки
    Err("ошибка на шаге 2")
}

async fn process_step3() -> Result<bool, &'static str> {
    // этот шаг не будет выполнен из-за ошибки в step2
    Ok(true)
}

let result = tokio::try_join!(
    process_step1(),
    process_step2(),
    process_step3()
);

match result {
    Ok(_) => {
        println!("Все шаги завершены успешно");
    }
    Err(err) => {
        println!("Обработка прервана на шаге с ошибкой: {}", err);
        // step3 не будет выполнен из-за досрочного завершения
    }
}
}

Использование с разными типами ошибок

#![allow(unused)]
fn main() {
use std::convert::From;

// Функция для приведения разных типов ошибок к общему типу
fn map_error<E: ToString>(err: E) -> String {
    err.to_string()
}

async fn api_call() -> Result<String, reqwest::Error> {
    // вызов API
    Ok("данные API".to_string())
}

async fn db_query() -> Result<i32, sqlx::Error> {
    // запрос к базе данных
    Ok(42)
}

let result = tokio::try_join!(
    api_call().map_err(map_error),
    db_query().map_err(map_error)
);

match result {
    Ok((api_data, db_data)) => {
        println!("API данные: {}, DB данные: {}", api_data, db_data);
    }
    Err(err) => {
        eprintln!("Ошибка: {}", err);
    }
}
}

Модули

Модуль doc

Доступно только на docs.rs и Unix системах.

Типы, которые документированы локально в крейте Tokio, но фактически не находятся здесь.

Обратите внимание, что этот модуль виден только на docs.rs, вы не можете использовать его напрямую в своем коде.

Модули

ИмяФлагиОписание
osnet или fsСм. std::os.

Перечисления

ИмяОписание
NotDefinedHereИмя типа, который не определен здесь.

Примечание по использованию

Этот модуль существует исключительно для целей документации и не предназначен для прямого использования в коде. Он служит для организации документации на сайте docs.rs и предоставления перекрестных ссылок между различными компонентами Tokio.

Фактические реализации типов, упомянутых в этом модуле, находятся в других местах крейта Tokio или в стандартной библиотеке Rust.

Модуль fs

Доступно только с флагом функции fs.

Асинхронные файловые утилиты.

Этот модуль содержит служебные методы для работы с файловой системой асинхронно. Это включает чтение/запись файлов и работу с каталогами.

Важное замечание

Имейте в виду, что большинство операционных систем не предоставляют асинхронные API файловой системы. Из-за этого Tokio будет использовать обычные блокирующие файловые операции "за кулисами". Это делается с использованием пула потоков spawn_blocking для их выполнения в фоновом режиме.

Модуль tokio::fs следует использовать только для обычных файлов. Попытка использовать его с, например, именованным каналом в Linux может привести к неожиданному поведению, такому как зависания при завершении работы среды выполнения. Для специальных файлов следует использовать специальные типы, такие как tokio::net::unix::pipe или AsyncFd.

В настоящее время Tokio всегда использует spawn_blocking на всех платформах, но в будущем это может быть изменено на использование асинхронных API файловой системы, таких как io_uring.

Использование

Самый простой способ использовать этот модуль — использовать служебные функции, которые работают с целыми файлами:

  • tokio::fs::read
  • tokio::fs::read_to_string
  • tokio::fs::write

Две функции чтения считывают весь файл и возвращают его содержимое. Функция записи принимает содержимое файла и записывает это содержимое в файл. Она перезаписывает существующий файл, если он есть.

Например, чтобы прочитать файл:

#![allow(unused)]
fn main() {
let contents = tokio::fs::read_to_string("my_file.txt").await?;

println!("Файл содержит {} строк.", contents.lines().count());
}

Чтобы перезаписать файл:

#![allow(unused)]
fn main() {
let contents = "Первая строка.\nВторая строка.\nТретья строка.\n";

tokio::fs::write("my_file.txt", contents.as_bytes()).await?;
}

Использование File

Основной тип для взаимодействия с файлами — File. Он может использоваться для чтения и записи данного файла. Это делается с использованием трейтов AsyncRead и AsyncWrite. Этот тип обычно используется, когда вы хотите сделать что-то более сложное, чем просто прочитать или записать все содержимое за один раз.

Примечание: Важно использовать flush при записи в файл Tokio. Это связано с тем, что вызовы write возвращаются до завершения записи, а flush будет ждать завершения записи. (Запись произойдет, даже если вы не вызываете flush; это просто произойдет позже.) Это отличается от std::fs::File и связано с тем, что File использует spawn_blocking за кулисами.

Например, чтобы подсчитать количество строк в файле без загрузки всего файла в память:

#![allow(unused)]
fn main() {
use tokio::fs::File;
use tokio::io::AsyncReadExt;

let mut file = File::open("my_file.txt").await?;

let mut chunk = vec![0; 4096];
let mut number_of_lines = 0;
loop {
    let len = file.read(&mut chunk).await?;
    if len == 0 {
        // Длина ноль означает конец файла.
        break;
    }
    for &b in &chunk[..len] {
        if b == b'\n' {
            number_of_lines += 1;
        }
    }
}

println!("Файл содержит {} строк.", number_of_lines);
}

Например, чтобы записать файл построчно:

#![allow(unused)]
fn main() {
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

let mut file = File::create("my_file.txt").await?;

file.write_all(b"Первая строка.\n").await?;
file.write_all(b"Вторая строка.\n").await?;
file.write_all(b"Третья строка.\n").await?;

// Не забудьте вызвать `flush` после записи!
file.flush().await?;
}

Настройка производительности файлового ввода-вывода

Файлы Tokio используют spawn_blocking за кулисами, и это имеет серьезные последствия для производительности. Чтобы добиться хорошей производительности при файловом вводе-выводе в Tokio, рекомендуется объединять ваши операции в как можно меньше вызовов spawn_blocking.

Один пример этого различия можно увидеть, сравнив два приведенных выше примера чтения. Первый пример использует tokio::fs::read, который считывает весь файл за один вызов spawn_blocking, а затем возвращает его. Второй пример будет читать файл частями, используя множество вызовов spawn_blocking. Это означает, что второй пример, скорее всего, будет более затратным для больших файлов. (Конечно, использование частей может быть необходимо для очень больших файлов, которые не помещаются в памяти.)

Следующие примеры покажут некоторые стратегии для этого:

Запись всего содержимого за один раз

При создании файла записывайте данные в String или Vec<u8>, а затем записывайте весь файл одним вызовом spawn_blocking с помощью tokio::fs::write.

#![allow(unused)]
fn main() {
let mut contents = String::new();

contents.push_str("Первая строка.\n");
contents.push_str("Вторая строка.\n");
contents.push_str("Третья строка.\n");

tokio::fs::write("my_file.txt", contents.as_bytes()).await?;
}

Использование буферизации

Используйте BufReader и BufWriter для буферизации множества мелких операций чтения или записи в несколько крупных. Этот пример, скорее всего, выполнит только один вызов spawn_blocking.

#![allow(unused)]
fn main() {
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};

let mut file = BufWriter::new(File::create("my_file.txt").await?);

file.write_all(b"Первая строка.\n").await?;
file.write_all(b"Вторая строка.\n").await?;
file.write_all(b"Третья строка.\n").await?;

// Из-за BufWriter фактическая запись и вызов spawn_blocking
// происходят при вызове flush.
file.flush().await?;
}

Ручное использование std::fs внутри spawn_blocking

#![allow(unused)]
fn main() {
use std::fs::File;
use std::io::{self, Write};
use tokio::task::spawn_blocking;

spawn_blocking(move || {
    let mut file = File::create("my_file.txt")?;

    file.write_all(b"Первая строка.\n")?;
    file.write_all(b"Вторая строка.\n")?;
    file.write_all(b"Третья строка.\n")?;

    // В отличие от файла Tokio, файлу std::fs
    // не нужен flush.

    io::Result::Ok(())
}).await.unwrap()?;
}

Также полезно знать о File::set_max_buf_size, который управляет максимальным количеством байт, которое файл Tokio будет читать или записывать за один вызов spawn_blocking. По умолчанию это два мегабайта, но это может измениться.

Структуры

ИмяОписание
DirBuilderПостроитель для создания каталогов различными способами.
DirEntryЗаписи, возвращаемые потоком ReadDir.
FileСсылка на открытый файл в файловой системе.
OpenOptionsОпции и флаги, которые можно использовать для настройки открытия файла.
ReadDirЧтение записей в каталоге.

Функции

ИмяОписание
canonicalizeВозвращает каноническую, абсолютную форму пути с нормализованными промежуточными компонентами и разрешенными символическими ссылками.
copyКопирует содержимое одного файла в другой. Также копирует биты разрешений исходного файла в файл назначения. Перезаписывает содержимое файла назначения.
create_dirСоздает новый пустой каталог по указанному пути.
create_dir_allРекурсивно создает каталог и все его родительские компоненты, если они отсутствуют.
hard_linkСоздает новую жесткую ссылку в файловой системе.
metadataПолучает информацию о файле, каталоге и т.д. по указанному пути.
readЧитает все содержимое файла в вектор байтов.
read_dirВозвращает поток записей в каталоге.
read_linkЧитает символическую ссылку, возвращая файл, на который она указывает.
read_to_stringСоздает future, который откроет файл для чтения и прочитает все содержимое в строку.
remove_dirУдаляет существующий пустой каталог.
remove_dir_allУдаляет каталог по этому пути, предварительно удалив все его содержимое. Используйте осторожно!
remove_fileУдаляет файл из файловой системы.
renameПереименовывает файл или каталог в новое имя, заменяя исходный файл, если to уже существует.
set_permissionsИзменяет разрешения файла или каталога.
symlinkUnix: Создает новую символическую ссылку в файловой системе.
symlink_dirWindows: Создает новую символическую ссылку на каталог.
symlink_fileWindows: Создает новую символическую ссылку на файл.
symlink_metadataПолучает метаданные файловой системы для пути.
try_existsВозвращает Ok(true), если путь указывает на существующую сущность.
writeСоздает future, который откроет файл для записи и запишет все содержимое contents в него.

Модуль io

Трейты, вспомогательные средства и определения типов для асинхронной функциональности ввода-вывода.

Этот модуль является асинхронной версией std::io. В основном он определяет два трейта: AsyncRead и AsyncWrite, которые являются асинхронными версиями трейтов Read и Write из стандартной библиотеки.

AsyncRead и AsyncWrite

Как и трейты Read и Write стандартной библиотеки, AsyncRead и AsyncWrite предоставляют наиболее общий интерфейс для чтения и записи ввода и вывода. Однако, в отличие от трейтов стандартной библиотеки, они асинхронны — это означает, что чтение из или запись в тип tokio::io будет уступать планировщику Tokio, когда ввод-вывод не готов, вместо блокировки. Это позволяет другим задачам выполняться во время ожидания ввода-вывода.

Другое отличие заключается в том, что AsyncRead и AsyncWrite содержат только основные методы, необходимые для предоставления асинхронной функциональности чтения и записи. Вместо этого служебные методы определены в трейтах-расширениях AsyncReadExt и AsyncWriteExt. Эти трейты автоматически реализуются для всех значений, которые реализуют AsyncRead и AsyncWrite соответственно.

Конечные пользователи редко будут взаимодействовать напрямую с AsyncRead и AsyncWrite. Вместо этого они будут использовать асинхронные функции, определенные в трейтах-расширениях. Ожидается, что авторы библиотек будут реализовывать AsyncRead и AsyncWrite, чтобы предоставлять типы, которые ведут себя как потоки байтов.

Даже с этими различиями, трейты AsyncRead и AsyncWrite Tokio можно использовать почти точно так же, как Read и Write стандартной библиотеки. Большинство типов в стандартной библиотеке, которые реализуют Read и Write, имеют асинхронные эквиваленты в tokio, которые реализуют AsyncRead и AsyncWrite, такие как File и TcpStream.

Например, документация стандартной библиотеки представляет Read, демонстрируя чтение некоторых байтов из std::fs::File. Мы можем сделать то же самое с tokio::fs::File:

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // читаем до 10 байт
    let n = f.read(&mut buffer).await?;

    println!("Байты: {:?}", &buffer[..n]);
    Ok(())
}

Буферизированные читатели и писатели

Интерфейсы на основе байтов неудобны и могут быть неэффективными, так как нам потребуется делать почти постоянные вызовы к операционной системе. Чтобы помочь с этим, std::io поставляется с поддержкой буферизированных читателей и писателей, и, следовательно, tokio::io тоже.

Tokio предоставляет асинхронную версию трейта std::io::BufReadAsyncBufRead; а также асинхронные структуры BufReader и BufWriter, которые оборачивают читателей и писателей. Эти обертки используют буфер, уменьшая количество вызовов и предоставляя более удобные методы для доступа именно к тому, что вам нужно.

Например, BufReader работает с трейтом AsyncBufRead, чтобы добавить дополнительные методы к любому асинхронному читателю:

use tokio::io::{self, BufReader, AsyncBufReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let f = File::open("foo.txt").await?;
    let mut reader = BufReader::new(f);
    let mut buffer = String::new();

    // читаем строку в buffer
    reader.read_line(&mut buffer).await?;

    println!("{}", buffer);
    Ok(())
}

BufWriter не добавляет новых способов записи; он просто буферизует каждый вызов write. Однако вы должны вызывать flush для BufWriter, чтобы убедиться, что все буферизированные данные записаны.

use tokio::io::{self, BufWriter, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let f = File::create("foo.txt").await?;
    {
        let mut writer = BufWriter::new(f);

        // Записываем байт в буфер.
        writer.write(&[42u8]).await?;

        // Сбрасываем буфер перед выходом из области видимости.
        writer.flush().await?;

    } // Если не сброшено или не закрыто, содержимое буфера отбрасывается при удалении.

    Ok(())
}

Реализация AsyncRead и AsyncWrite

Поскольку это трейты, мы можем реализовать AsyncRead и AsyncWrite для наших собственных типов. Обратите внимание, что эти трейты должны реализовываться только для неблокирующих типов ввода-вывода, которые интегрируются с системой типов futures. Другими словами, эти типы никогда не должны блокировать поток, и вместо этого текущая задача уведомляется, когда ресурс ввода-вывода готов.

Преобразование в Stream/Sink и обратно

Часто удобно инкапсулировать чтение и запись байтов в Stream или Sink данных.

Tokio предоставляет простые обертки для преобразования AsyncRead в Stream и обратно в крейте tokio-util, см. ReaderStream и StreamReader.

Также существуют служебные трейты, которые абстрагируют асинхронную буферизацию, необходимую для написания собственных адаптеров для кодирования и декодирования байтов в/из ваших структурированных данных, позволяя преобразовать что-то, что реализует AsyncRead/AsyncWrite, в Stream/Sink, см. Decoder и Encoder в модуле tokio-util::codec.

Стандартный ввод и вывод

Tokio предоставляет асинхронные API для стандартного ввода, вывода и ошибки. Эти API очень похожи на предоставляемые std, но они также реализуют AsyncRead и AsyncWrite.

Обратите внимание, что API стандартного ввода/вывода должны использоваться в контексте среды выполнения Tokio, так как они требуют функций, специфичных для Tokio, чтобы функционировать. Вызов этих функций вне среды выполнения Tokio вызовет панику.

Реэкспорт из std

Дополнительно Error, ErrorKind, Result и SeekFrom реэкспортируются из std::io для удобства использования.

#![allow(unused)]
fn main() {
pub use std::io::Error;
pub use std::io::ErrorKind;
pub use std::io::Result;
pub use std::io::SeekFrom;
}

Модули

ИмяФлагиОписание
bsdFreeBSD и netСпецифичные для BSD типы ввода-вывода.
unixUnix и netАсинхронные структуры ввода-вывода, специфичные для Unix-подобных ОС.

Структуры

ИмяФлагиОписание
BufReaderio-utilДобавляет буферизацию любому читателю.
BufStreamio-utilОборачивает тип, который является AsyncWrite и AsyncRead, и буферизует его ввод и вывод.
BufWriterio-utilОборачивает писатель и буферизует его вывод.
Chainio-utilПоток для метода chain.
DuplexStreamio-utilДуплексный канал для чтения и записи байтов в памяти.
Emptyio-utilИгнорирует любые данные, записанные через AsyncWrite, и всегда будет пуст (возвращает ноль байт) при чтении через AsyncRead.
InterestСм. описаниеИнтерес события готовности.
Joinio-utilОбъединяет два значения, реализующих AsyncRead и AsyncWrite, в один дескриптор.
Linesio-utilЧитает строки из AsyncBufRead.
ReadBufОбертка вокруг байтового буфера, который постепенно заполняется и инициализируется.
ReadHalfio-utilЧитаемая половина значения, возвращенного из split.
ReadyСм. описаниеОписывает состояние готовности ресурса ввода-вывода.
Repeatio-utilАсинхронный читатель, который бесконечно повторяет один байт.
SimplexStreamio-utilОднонаправленный канал для чтения и записи байтов в памяти.
Sinkio-utilАсинхронный писатель, который будет перемещать данные в пустоту.
Splitio-utilРазделитель для метода split.
Stderrio-stdДескриптор потока стандартной ошибки процесса.
Stdinio-stdДескриптор потока стандартного ввода процесса.
Stdoutio-stdДескриптор потока стандартного вывода процесса.
Takeio-utilПоток для метода take.
WriteHalfio-utilЗаписываемая половина значения, возвращенного из split.

Трейты

ИмяФлагиОписание
AsyncBufReadАсинхронно читает байты.
AsyncBufReadExtio-utilТрейт-расширение, добавляющий служебные методы к типам AsyncBufRead.
AsyncReadЧитает байты из источника.
AsyncReadExtio-utilЧитает байты из источника.
AsyncSeekАсинхронно перемещается по байтам.
AsyncSeekExtio-utilТрейт-расширение, добавляющий служебные методы к типам AsyncSeek.
AsyncWriteАсинхронно записывает байты.
AsyncWriteExtio-utilЗаписывает байты в приемник.

Функции

ИмяФлагиОписание
copyio-utilАсинхронно копирует все содержимое читателя в писатель.
copy_bidirectionalio-utilКопирует данные в обоих направлениях между a и b.
copy_bidirectional_with_sizesio-utilКопирует данные в обоих направлениях между a и b с использованием буферов указанного размера.
copy_bufio-utilАсинхронно копирует все содержимое читателя в писатель.
duplexio-utilСоздает новую пару DuplexStream, которые ведут себя как пара соединенных сокетов.
emptyio-utilСоздает значение, которое всегда находится в EOF при чтении и игнорирует все записанные данные.
joinio-utilОбъединяет два значения, реализующих AsyncRead и AsyncWrite, в один дескриптор.
repeatio-utilСоздает экземпляр асинхронного читателя, который бесконечно повторяет один байт.
simplexio-utilСоздает однонаправленный буфер, который действует как канал в памяти.
sinkio-utilСоздает экземпляр асинхронного писателя, который успешно потребляет все данные.
splitio-utilРазделяет одно значение, реализующее AsyncRead + AsyncWrite, на отдельные дескрипторы AsyncRead и AsyncWrite.
stderrio-stdСоздает новый дескриптор стандартной ошибки текущего процесса.
stdinio-stdСоздает новый дескриптор стандартного ввода текущего процесса.
stdoutio-stdСоздает новый дескриптор стандартного вывода текущего процесса.

Модуль net

Доступно только в non-loom сборках.

TCP/UDP/Unix привязки для tokio.

Этот модуль содержит сетевые типы TCP/UDP/Unix, аналогичные стандартной библиотеке, которые могут использоваться для реализации сетевых протоколов.

Организация

  • TcpListener и TcpStream предоставляют функциональность для связи по TCP
  • UdpSocket предоставляет функциональность для связи по UDP
  • UnixListener и UnixStream предоставляют функциональность для связи через Unix Domain Stream Socket (доступно только в Unix)
  • UnixDatagram предоставляет функциональность для связи через Unix Domain Datagram Socket (доступно только в Unix)
  • tokio::net::unix::pipe для FIFO каналов (доступно только в Unix)
  • tokio::net::windows::named_pipe для именованных каналов (доступно только в Windows)

Для ресурсов ввода-вывода, недоступных в tokio::net, вы можете использовать AsyncFd.

Модули

ИмяФлагиОписание
tcpnetТипы утилит TCP.
unixUnix и netСпецифичные для Unix сетевые типы.
windowsWindows и netСпецифичные для Windows сетевые типы.

Структуры

ИмяФлагиОписание
TcpListenernetTCP серверный сокет, прослушивающий подключения.
TcpSocketNon-WASITCP сокет, который еще не был преобразован в TcpStream или TcpListener.
TcpStreamnetTCP поток между локальным и удаленным сокетом.
UdpSocketnetUDP сокет.
UnixDatagramUnix и netОбъект ввода-вывода, представляющий Unix датаграммный сокет.
UnixListenerUnix и netUnix сокет, который может принимать подключения от других Unix сокетов.
UnixSocketUnix и netUnix сокет, который еще не был преобразован в UnixStream, UnixDatagram или UnixListener.
UnixStreamUnix и netСтруктура, представляющая подключенный Unix сокет.

Трейты

ИмяОписание
ToSocketAddrsПреобразует или разрешает (без блокировки) в одно или несколько значений SocketAddr.

Функции

ИмяФлагиОписание
lookup_hostnetВыполняет DNS разрешение.

Модуль process

Доступно только с флагом функции process.

Реализация асинхронного управления процессами для Tokio.

Этот модуль предоставляет структуру Command, которая имитирует интерфейс типа std::process::Command из стандартной библиотеки, но предоставляет асинхронные версии функций, создающих процессы. Эти функции (spawn, status, output и их варианты) возвращают типы, "осведомленные" о future, которые взаимодействуют с Tokio. Поддержка асинхронных процессов обеспечивается через обработку сигналов в Unix и системные API в Windows.

Примеры

Вот пример программы, которая запустит echo hello world и затем будет ждать его завершения.

use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Использование аналогично типу `Command` стандартной библиотеки
    let mut child = Command::new("echo")
        .arg("hello")
        .arg("world")
        .spawn()
        .expect("не удалось запустить");

    // Ожидаем завершения команды
    let status = child.wait().await?;
    println!("команда завершилась с: {}", status);
    Ok(())
}

Теперь рассмотрим пример, где мы не только запускаем echo hello world, но и захватываем его вывод.

use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Как и выше, но используем `output`, который возвращает future вместо
    // немедленного возврата `Child`.
    let output = Command::new("echo").arg("hello").arg("world")
                        .output();

    let output = output.await?;

    assert!(output.status.success());
    assert_eq!(output.stdout, b"hello world\n");
    Ok(())
}

Мы также можем читать ввод построчно.

use tokio::io::{BufReader, AsyncBufReadExt};
use tokio::process::Command;

use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut cmd = Command::new("cat");

    // Указываем, что хотим получить стандартный вывод команды через канал.
    // По умолчанию стандартный ввод/вывод/ошибка наследуются от текущего процесса
    // (например, это означает, что стандартный ввод будет поступать с клавиатуры,
    // а стандартный вывод/ошибка будут направляться непосредственно в терминал,
    // если этот процесс вызван из командной строки).
    cmd.stdout(Stdio::piped());

    let mut child = cmd.spawn()
        .expect("не удалось запустить команду");

    let stdout = child.stdout.take()
        .expect("дочерний процесс не имел дескриптора stdout");

    let mut reader = BufReader::new(stdout).lines();

    // Убеждаемся, что дочерний процесс запущен в среде выполнения, чтобы он мог
    // самостоятельно прогрессировать, пока мы ожидаем какой-либо вывод.
    tokio::spawn(async move {
        let status = child.wait().await
            .expect("дочерний процесс столкнулся с ошибкой");

        println!("статус дочернего процесса: {}", status);
    });

    while let Some(line) = reader.next_line().await? {
        println!("Строка: {}", line);
    }

    Ok(())
}

Вот еще один пример использования sort с записью в стандартный ввод дочернего процесса и захватом вывода отсортированного текста.

use tokio::io::AsyncWriteExt;
use tokio::process::Command;

use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut cmd = Command::new("sort");

    // Указываем, что хотим канал как для вывода, так и для ввода.
    // Аналогично захвату вывода, настройка канала для stdin позволяет
    // использовать его как асинхронный писатель.
    cmd.stdout(Stdio::piped());
    cmd.stdin(Stdio::piped());

    let mut child = cmd.spawn().expect("не удалось запустить команду");

    // Животные, которые мы хотим отсортировать
    let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];

    let mut stdin = child
        .stdin
        .take()
        .expect("дочерний процесс не имел дескриптора stdin");

    // Записываем наших животных в дочерний процесс
    // Обратите внимание, что поведение `sort` - буферизировать _весь ввод_ перед записью любого вывода.
    // В общем смысле рекомендуется писать в дочерний процесс в отдельной задаче,
    // одновременно ожидая его завершения (или вывода), чтобы избежать взаимоблокировок
    // (например, дочерний процесс пытается записать какой-то вывод, но зависает в ожидании,
    // пока родительский процесс прочитает его, в то время как родительский процесс
    // завис в ожидании полной записи своего ввода перед чтением вывода).
    stdin
        .write(animals.join("\n").as_bytes())
        .await
        .expect("не удалось записать в stdin");

    // Мы удаляем дескриптор здесь, что сигнализирует EOF дочернему процессу.
    // Это сообщает дочернему процессу, что в канале больше нет данных.
    drop(stdin);

    let op = child.wait_with_output().await?;

    // Результаты должны вернуться в отсортированном порядке
    assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());

    Ok(())
}

С некоторой координацией мы также можем передавать вывод одной команды в другую.

use tokio::join;
use tokio::process::Command;
use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut echo = Command::new("echo")
        .arg("hello world!")
        .stdout(Stdio::piped())
        .spawn()
        .expect("не удалось запустить echo");

    let tr_stdin: Stdio = echo
        .stdout
        .take()
        .unwrap()
        .try_into()
        .expect("не удалось преобразовать в Stdio");

    let tr = Command::new("tr")
        .arg("a-z")
        .arg("A-Z")
        .stdin(tr_stdin)
        .stdout(Stdio::piped())
        .spawn()
        .expect("не удалось запустить tr");

    let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());

    assert!(echo_result.unwrap().success());

    let tr_output = tr_output.expect("не удалось дождаться tr");
    assert!(tr_output.status.success());

    assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");

    Ok(())
}

Особенности

Удаление/Отмена

Аналогично поведению стандартной библиотеки и в отличие от парадигмы futures, где удаление подразумевает отмену, запущенный процесс по умолчанию продолжит выполняться даже после удаления дескриптора Child.

Метод Command::kill_on_drop может быть использован для изменения этого поведения и убийства дочернего процесса, если обертка Child удалена до его завершения.

Процессы Unix

В Unix-платформах процессы должны быть "убраны" (reaped) их родительским процессом после завершения, чтобы освободить все ресурсы ОС. Дочерний процесс, который завершился, но еще не был убран своим родителем, считается "зомби"-процессом. Такие процессы продолжают учитываться в ограничениях, накладываемых системой, и наличие слишком большого количества зомби-процессов может препятствовать запуску дополнительных процессов.

Среда выполнения Tokio будет пытаться убирать и очищать любые процессы, которые она запустила, на основе принципа "максимального усилия". Не дается дополнительных гарантий относительно того, насколько быстро или как часто эта процедура будет выполняться.

Рекомендуется избегать удаления дескриптора дочернего процесса до его полного ожидания, если требуются более строгие гарантии очистки.

Структуры

ИмяОписание
ChildПредставление дочернего процесса, запущенного в цикле событий.
ChildStderrПоток стандартной ошибки для запущенных дочерних процессов.
ChildStdinПоток стандартного ввода для запущенных дочерних процессов.
ChildStdoutПоток стандартного вывода для запущенных дочерних процессов.
CommandЭта структура имитирует API std::process::Command из стандартной библиотеки, но заменяет функции, создающие процесс, асинхронными вариантами. Основные предоставляемые асинхронные функции - spawn, status и output.

Модуль runtime

Доступно только с флагом функции rt.

Среда выполнения Tokio.

В отличие от других программ на Rust, асинхронные приложения требуют поддержки среды выполнения. В частности, необходимы следующие службы среды выполнения:

  • Цикл событий ввода-вывода, называемый драйвером, который управляет ресурсами ввода-вывода и распределяет события ввода-вывода между задачами, которые от них зависят.
  • Планировщик для выполнения задач, использующих эти ресурсы ввода-вывода.
  • Таймер для планирования работы, которая должна выполниться через заданный промежуток времени.

Runtime Tokio объединяет все эти службы в один тип, позволяя запускать, останавливать и настраивать их вместе. Однако часто не требуется настраивать Runtime вручную, и пользователь может просто использовать атрибут-макрос tokio::main, который создает Runtime "под капотом".

Выбор среды выполнения

Вот эмпирические правила для выбора правильной среды выполнения для вашего приложения:

   +------------------------------------------------------+
   | Нужен ли планировщик с work-stealing или многопоточный? |
   +------------------------------------------------------+
                   | Да               | Нет
                   |                  |
                   |                  |
                   v                  |
     +------------------------+       |
     | Многопоточный Runtime  |       |
     +------------------------+       |
                                      |
                                      V
                     +--------------------------------+
                     | Выполняете ли вы `!Send` Future? |
                     +--------------------------------+
                           | Да                  | Нет
                           |                     |
                           V                     |
             +--------------------------+        |
             | Local Runtime (нестабильный) |    |
             +--------------------------+        |
                                                 |
                                                 v
                                     +------------------------+
                                     | Текущий поток Runtime  |
                                     +------------------------+

Приведенное дерево решений не является исчерпывающим. Существуют другие факторы, которые могут повлиять на ваше решение.

Связывание с синхронным кодом

Подробности см. на https://tokio.rs/tokio/topics/bridging.

Осведомленность о NUMA

Среда выполнения Tokio не осведомлена о NUMA (Non-Uniform Memory Access). Для лучшей производительности на системах с NUMA вы можете захотеть запустить несколько сред выполнения вместо одной.

Использование

Когда точная настройка не требуется, можно использовать атрибут-макрос tokio::main.

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // В цикле читаем данные из сокета и записываем данные обратно.
            loop {
                let n = match socket.read(&mut buf).await {
                    // сокет закрыт
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        println!("не удалось прочитать из сокета; err = {:?}", e);
                        return;
                    }
                };

                // Записываем данные обратно
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    println!("не удалось записать в сокет; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

Из контекста среды выполнения дополнительные задачи порождаются с помощью функции tokio::spawn. Future, порожденные с помощью этой функции, будут выполняться в том же пуле потоков, который используется Runtime.

Экземпляр Runtime также может использоваться напрямую.

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Создаем среду выполнения
    let rt  = Runtime::new()?;

    // Порождаем корневую задачу
    rt.block_on(async {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;

        loop {
            let (mut socket, _) = listener.accept().await?;

            tokio::spawn(async move {
                let mut buf = [0; 1024];

                // В цикле читаем данные из сокета и записываем данные обратно.
                loop {
                    let n = match socket.read(&mut buf).await {
                        // сокет закрыт
                        Ok(0) => return,
                        Ok(n) => n,
                        Err(e) => {
                            println!("не удалось прочитать из сокета; err = {:?}", e);
                            return;
                        }
                    };

                    // Записываем данные обратно
                    if let Err(e) = socket.write_all(&buf[0..n]).await {
                        println!("не удалось записать в сокет; err = {:?}", e);
                        return;
                    }
                }
            });
        }
    })
}

Конфигурации среды выполнения

Tokio предоставляет несколько стратегий планирования задач, подходящих для разных приложений. Для выбора планировщика можно использовать построитель среды выполнения или атрибут #[tokio::main].

Многопоточный планировщик

Многопоточный планировщик выполняет future в пуле потоков, используя стратегию work-stealing. По умолчанию он запускает рабочий поток для каждого доступного ядра CPU в системе. Это, как правило, идеальная конфигурация для большинства приложений. Многопоточный планировщик требует флага функции rt-multi-thread и выбирается по умолчанию:

#![allow(unused)]
fn main() {
use tokio::runtime;

let threaded_rt = runtime::Runtime::new()?;
}

Большинство приложений должны использовать многопоточный планировщик, за исключением некоторых узких случаев использования, таких как когда требуется выполнение только в одном потоке.

Планировщик текущего потока

Планировщик текущего потока предоставляет однопоточный исполнитель future. Все задачи будут создаваться и выполняться в текущем потоке. Это требует флага функции rt.

#![allow(unused)]
fn main() {
use tokio::runtime;

let rt = runtime::Builder::new_current_thread()
    .build()?;
}

Драйверы ресурсов

При ручной настройке среды выполнения никакие драйверы ресурсов не включены по умолчанию. В этом случае попытка использования сетевых типов или типов времени завершится неудачей. Чтобы включить эти типы, необходимо включить драйверы ресурсов. Это делается с помощью Builder::enable_io и Builder::enable_time. В качестве сокращения Builder::enable_all включает оба драйвера ресурсов.

Время жизни порожденных потоков

Среда выполнения может порождать потоки в зависимости от ее конфигурации и использования. Многопоточный планировщик порождает потоки для планирования задач и для вызовов spawn_blocking.

Пока Runtime активен, потоки могут завершаться после периодов простоя. После удаления Runtime все потоки среды выполнения обычно завершаются, но при наличии непрекращаемой порожденной работы не гарантируется, что они были завершены. Подробнее см. документацию на уровне структуры.

Подробное поведение среды выполнения

Этот раздел дает более подробную информацию о том, как среда выполнения Tokio будет планировать задачи для выполнения.

На самом базовом уровне среда выполнения имеет коллекцию задач, которые нужно запланировать. Она будет повторно удалять задачу из этой коллекции и планировать ее (вызывая poll). Когда коллекция пуста, поток перейдет в спящий режим до тех пор, пока в коллекцию не будет добавлена задача.

Однако вышесказанного недостаточно для гарантии корректного поведения среды выполнения. Например, среда выполнения может иметь одну задачу, которая всегда готова к планированию, и планировать эту задачу каждый раз. Это проблема, потому что это "морит голодом" другие задачи, не планируя их. Чтобы решить эту проблему, Tokio предоставляет следующую гарантию честности:

Если общее количество задач не растет безгранично и никакая задача не блокирует поток, то гарантируется, что задачи планируются честно.

Или, более формально:

При следующих двух предположениях:

  1. Существует некоторое число MAX_TASKS, такое что общее количество задач в среде выполнения в любой конкретный момент времени никогда не превышает MAX_TASKS.
  2. Существует некоторое число MAX_SCHEDULE, такое что вызов poll для любой задачи, порожденной в среде выполнения, возвращается в течение MAX_SCHEDULE единиц времени.

Тогда существует некоторое число MAX_DELAY, такое что когда задача пробуждается, она будет запланирована средой выполнения в течение MAX_DELAY единиц времени.

(Здесь MAX_TASKS и MAX_SCHEDULE могут быть любыми числами, и пользователь среды выполнения может выбрать их. Число MAX_DELAY контролируется средой выполнения и зависит от значения MAX_TASKS и MAX_SCHEDULE.)

Помимо приведенной выше гарантии честности, нет никаких гарантий относительно порядка, в котором задачи планируются. Также нет гарантии, что среда выполнения одинаково честна ко всем задачам. Например, если у среды выполнения есть две задачи A и B, которые обе готовы, то среда выполнения может запланировать A пять раз, прежде чем запланирует B. Это верно, даже если A уступает с помощью yield_now. Все, что гарантируется, это то, что она запланирует B в конечном итоге.

Обычно задачи планируются только если они были пробуждены вызовом wake на их waker. Однако это не гарантируется, и Tokio может планировать задачи, которые не были пробуждены, при некоторых обстоятельствах. Это называется ложным пробуждением (spurious wakeup).

Ввод-вывод и таймеры

Помимо планирования задач, среда выполнения также должна управлять ресурсами ввода-вывода и таймерами. Она делает это, периодически проверяя, есть ли какие-либо ресурсы ввода-вывода или таймеры, которые готовы, и пробуждая соответствующую задачу, чтобы она была запланирована.

Эти проверки выполняются периодически между планированием задач. При тех же предположениях, что и предыдущая гарантия честности, Tokio гарантирует, что она пробудит задачи с событием ввода-вывода или таймера в течение некоторого максимального количества единиц времени.

Реэкспорт

#![allow(unused)]
fn main() {
pub use dump::Dump; // `tokio_unstable` и `taskdump` и `Linux` и (`AArch64` или `x86` или `x86-64`)
}

Модули

ИмяФлагиОписание
dumptokio_unstable и taskdump и Linux и (AArch64 или x86 или x86-64)Снимки состояния среды выполнения.

Структуры

ИмяФлагиОписание
BuilderСтроит Tokio Runtime с пользовательскими значениями конфигурации.
EnterGuardЗащита контекста среды выполнения.
HandleДескриптор среды выполнения.
HistogramConfigurationtokio_unstableКонфигурация для гистограммы количества опросов.
Idtokio_unstableНепрозрачный ID, однозначно идентифицирующий среду выполнения относительно всех других текущих сред выполнения.
LocalOptionstokio_unstableОпции конфигурации только для LocalRuntime.
LocalRuntimetokio_unstableЛокальная среда выполнения Tokio.
LogHistogramtokio_unstableЛогарифмическая гистограмма.
LogHistogramBuildertokio_unstableКонфигурация для LogHistogram.
RngSeedtokio_unstableSeed для генерации случайных чисел.
RuntimeСреда выполнения Tokio.
RuntimeMetricsДескриптор метрик среды выполнения.
TaskMetatokio_unstableМетаданные задачи, предоставляемые пользовательскими хуками для событий задач.
TryCurrentErrorОшибка, возвращаемая try_current, когда среда выполнения не запущена.

Перечисления

ИмяФлагиОписание
HistogramScaletokio_unstableИспользует ли гистограмма для агрегации метрики линейную или логарифмическую шкалу.
InvalidHistogramConfigurationtokio_unstableОшибка построения гистограммы.
RuntimeFlavorТип (flavor) среды выполнения.
UnhandledPanictokio_unstableКак среда выполнения должна реагировать на необработанные паники.

Модуль signal

Доступно только с флагом функции signal.

Асинхронная обработка сигналов для Tokio.

Обратите внимание, что обработка сигналов в целом является очень сложной темой и должна использоваться с большой осторожностью. Этот крейт пытается реализовать "лучшие практики" для обработки сигналов, но его следует оценить с учетом потребностей ваших собственных приложений, чтобы убедиться в его пригодности.

Также существуют некоторые фундаментальные ограничения этого крейта, документированные в OS-специфичных структурах.

Примеры

Вывод при получении уведомления "ctrl-c"

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    signal::ctrl_c().await?;
    println!("получен ctrl-c!");
    Ok(())
}

Ожидание SIGHUP в Unix

use tokio::signal::unix::{signal, SignalKind};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Бесконечный поток сигналов hangup.
    let mut stream = signal(SignalKind::hangup())?;

    // Выводим сообщение при каждом получении сигнала HUP
    loop {
        stream.recv().await;
        println!("получен сигнал HUP");
    }
}

Модули

ИмяФлагиОписание
unixUnixСпецифичные для Unix типы для обработки сигналов.
windowsWindowsСпецифичные для Windows типы для обработки сигналов.

Функции

ИмяОписание
ctrl_cЗавершается, когда в процесс отправляется уведомление "ctrl-c".

Модуль stream

Из-за того, что включение трейта Stream в std произошло позже выпуска Tokio 1.0, большинство утилит потоков Tokio были перемещены в крейт tokio-stream.

Почему Stream не был включен в Tokio 1.0?

Изначально мы планировали выпустить Tokio 1.0 со стабильным типом Stream, но, к сожалению, RFC не был вовремя принят, чтобы Stream попал в std на стабильном компиляторе к моменту выпуска Tokio 1.0. По этой причине команда решила переместить все утилиты на основе Stream в крейт tokio-stream. Хотя это не идеально, как только Stream попадет в стандартную библиотеку и период MSRV истечет, мы реализуем stream для наших различных типов.

Хотя это может показаться неудачным, не все потеряно, так как вы можете получить большую часть поддержки Stream с помощью async/await и циклов while let. Также возможно создать impl Stream из async fn с помощью крейта async-stream.

Пример

Преобразование sync::mpsc::Receiver в impl Stream

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::<usize>(16);

let stream = async_stream::stream! {
    while let Some(item) = rx.recv().await {
        yield item;
    }
};
}

Использование tokio-stream для дополнительной функциональности

#![allow(unused)]
fn main() {
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel::<usize>(16);

// Преобразование Receiver в Stream с помощью tokio-stream
let mut stream = ReceiverStream::new(rx);

// Использование методов StreamExt
while let Some(item) = stream.next().await {
    println!("получен элемент: {}", item);
}
}

Создание пользовательского Stream с async-stream

#![allow(unused)]
fn main() {
use async_stream::stream;
use std::time::Duration;
use tokio::time::sleep;

let mut counter = 0;
let stream = stream! {
    for _ in 0..5 {
        sleep(Duration::from_secs(1)).await;
        counter += 1;
        yield counter;
    }
};

// Использование созданного потока
tokio::pin!(stream);
while let Some(value) = stream.next().await {
    println!("значение: {}", value);
}
}

Рекомендации

Для работы с потоками в Tokio рекомендуется:

  1. Использовать крейт tokio-stream для дополнительных утилит потоков
  2. Использовать async-stream для создания пользовательских потоков
  3. Использовать комбинацию async/await и циклов while let для базовой обработки потоков
  4. Следить за обновлениями относительно включения Stream в стандартную библиотеку

Альтернативы

До стабилизации Stream в std можно использовать следующие подходы:

  • Использовать futures::Stream из крейта futures
  • Использовать async-stream для создания потоков
  • Использовать циклы while let с асинхронными итераторами
  • Использовать каналы Tokio (mpsc, broadcast и др.) как потоки данных

Модуль sync

Доступно только с флагом функции sync.

Примитивы синхронизации для использования в асинхронных контекстах.

Программы Tokio обычно организованы как набор задач, где каждая задача работает независимо и может выполняться на отдельных физических потоках. Примитивы синхронизации, предоставляемые в этом модуле, позволяют этим независимым задачам взаимодействовать друг с другом.

Передача сообщений

Наиболее распространенной формой синхронизации в программе Tokio является передача сообщений. Две задачи работают независимо и отправляют сообщения друг другу для синхронизации. Это имеет преимущество в виде избегания разделяемого состояния.

Передача сообщений реализована с использованием каналов. Канал поддерживает отправку сообщения от одной задачи-производителя к одной или нескольким задачам-потребителям. Tokio предоставляет несколько разновидностей каналов. Каждая разновидность канала поддерживает различные шаблоны передачи сообщений. Когда канал поддерживает несколько производителей, многие отдельные задачи могут отправлять сообщения. Когда канал поддерживает несколько потребителей, многие различные отдельные задачи могут получать сообщения.

Tokio предоставляет множество различных разновидностей каналов, поскольку различные шаблоны передачи сообщений лучше обрабатываются разными реализациями.

Канал oneshot

Канал oneshot поддерживает отправку одного значения от одного производителя к одному потребителю. Этот канал обычно используется для отправки результата вычисления ожидающей стороне.

Пример: использование канала oneshot для получения результата вычисления.

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

async fn some_computation() -> String {
    "представляет результат вычисления".to_string()
}

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    let res = some_computation().await;
    tx.send(res).unwrap();
});

// Делаем другую работу, пока вычисление происходит в фоне

// Ожидаем результат вычисления
let res = rx.await.unwrap();
}

Примечание: если задача производит результат вычисления как свое последнее действие перед завершением, можно использовать JoinHandle для получения этого значения вместо выделения ресурсов для канала oneshot. Ожидание JoinHandle возвращает Result. Если задача паникует, JoinHandle возвращает Err с причиной паники.

Пример:

#![allow(unused)]
fn main() {
async fn some_computation() -> String {
    "результат вычисления".to_string()
}

let join_handle = tokio::spawn(async move {
    some_computation().await
});

// Делаем другую работу, пока вычисление происходит в фоне

// Ожидаем результат вычисления
let res = join_handle.await.unwrap();
}

Канал mpsc

Канал mpsc поддерживает отправку многих значений от многих производителей к одному потребителю. Этот канал часто используется для отправки работы задаче или для получения результата многих вычислений.

Это также канал, который вы должны использовать, если хотите отправлять много сообщений от одного производителя к одному потребителю. Нет специального канала spsc.

Пример: использование mpsc для постепенной потоковой передачи результатов серии вычислений.

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

async fn some_computation(input: u32) -> String {
    format!("результат вычисления {}", input)
}

let (tx, mut rx) = mpsc::channel(100);

tokio::spawn(async move {
    for i in 0..10 {
        let res = some_computation(i).await;
        tx.send(res).await.unwrap();
    }
});

while let Some(res) = rx.recv().await {
    println!("получено = {}", res);
}
}

Аргумент mpsc::channel - это емкость канала. Это максимальное количество значений, которые могут храниться в канале в ожидании получения в любой момент времени. Правильная установка этого значения является ключевой для реализации надежных программ, поскольку емкость канала играет критическую роль в обработке обратного давления.

Распространенным шаблоном параллелизма для управления ресурсами является порождение задачи, посвященной управлению этим ресурсом, и использование передачи сообщений между другими задачами для взаимодействия с ресурсом. Ресурсом может быть что угодно, что не может использоваться конкурентно. Некоторые примеры включают сокет и состояние программы. Например, если нескольким задачам нужно отправлять данные через один сокет, породите задачу для управления сокетом и используйте канал для синхронизации.

Пример: отправка данных из многих задач через один сокет с использованием передачи сообщений.

use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100);

    for _ in 0..10 {
        // Каждой задаче нужен собственный дескриптор `tx`. Это делается путем клонирования
        // исходного дескриптора.
        let tx = tx.clone();

        tokio::spawn(async move {
            tx.send(&b"данные для записи"[..]).await.unwrap();
        });
    }

    // Половина `rx` канала возвращает `None`, когда **все** клоны `tx`
    // удалены. Чтобы гарантировать возврат `None`, удалите дескриптор, принадлежащий
    // текущей задаче. Если этот дескриптор `tx` не удален, всегда будет
    // один оставшийся дескриптор `tx`.
    drop(tx);

    while let Some(res) = rx.recv().await {
        socket.write_all(res).await?;
    }

    Ok(())
}

Каналы mpsc и oneshot можно комбинировать для предоставления шаблона синхронизации типа запрос/ответ с разделяемым ресурсом. Задача порождается для синхронизации ресурса и ожидает команды, полученные по каналу mpsc. Каждая команда включает oneshot Sender, на который отправляется результат команды.

Пример: использование задачи для синхронизации счетчика u64. Каждая задача отправляет команду "получить и увеличить". Значение счетчика до увеличения отправляется через предоставленный канал oneshot.

#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use Command::Increment;

enum Command {
    Increment,
    // Здесь можно добавить другие команды
}

let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);

// Порождение задачи для управления счетчиком
tokio::spawn(async move {
    let mut counter: u64 = 0;

    while let Some((cmd, response)) = cmd_rx.recv().await {
        match cmd {
            Increment => {
                let prev = counter;
                counter += 1;
                response.send(prev).unwrap();
            }
        }
    }
});

let mut join_handles = vec![];

// Порождение задач, которые будут отправлять команду увеличения.
for _ in 0..10 {
    let cmd_tx = cmd_tx.clone();

    join_handles.push(tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();

        cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
        let res = resp_rx.await.unwrap();

        println!("предыдущее значение = {}", res);
    }));
}

// Ожидание завершения всех задач
for join_handle in join_handles.drain(..) {
    join_handle.await.unwrap();
}
}

Канал broadcast

Канал broadcast поддерживает отправку многих значений от многих производителей ко многим потребителям. Каждый потребитель получит каждое значение. Этот канал можно использовать для реализации шаблонов типа "fan out", распространенных в системах pub/sub или "chat".

Этот канал используется реже, чем oneshot и mpsc, но все еще имеет свои случаи использования.

Это также канал, который вы должны использовать, если хотите транслировать значения от одного производителя ко многим потребителям. Нет специального канала spmc broadcast.

Базовое использование:

#![allow(unused)]
fn main() {
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();
}

Канал watch

Канал watch поддерживает отправку многих значений от многих производителей ко многим потребителям. Однако в канале хранится только самое последнее значение. Потребители уведомляются, когда отправляется новое значение, но нет гарантии, что потребители увидят все значения.

Канал watch похож на канал broadcast с емкостью 1.

Случаи использования канала watch включают трансляцию изменений конфигурации или сигнализацию изменений состояния программы, таких как переход к завершению работы.

Пример: использование канала watch для уведомления задач об изменениях конфигурации. В этом примере файл конфигурации проверяется периодически. Когда файл изменяется, изменения конфигурации сигнализируются потребителям.

#![allow(unused)]
fn main() {
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};

use std::io;

#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
    timeout: Duration,
}

impl Config {
    async fn load_from_file() -> io::Result<Config> {
        // логика загрузки файла и десериализации здесь
    }
}

async fn my_async_operation() {
    // Делаем что-то здесь
}

// Загружаем начальное значение конфигурации
let mut config = Config::load_from_file().await.unwrap();

// Создаем канал watch, инициализированный загруженной конфигурацией
let (tx, rx) = watch::channel(config.clone());

// Порождение задачи для мониторинга файла.
tokio::spawn(async move {
    loop {
        // Ждем 10 секунд между проверками
        time::sleep(Duration::from_secs(10)).await;

        // Загружаем файл конфигурации
        let new_config = Config::load_from_file().await.unwrap();

        // Если конфигурация изменилась, отправляем новое значение конфигурации
        // по каналу watch.
        if new_config != config {
            tx.send(new_config.clone()).unwrap();
            config = new_config;
        }
    }
});

let mut handles = vec![];

// Порождение задач, которые выполняют асинхронную операцию не более `timeout`. Если
// время ожидания истекает, перезапускаем операцию.
//
// Задача одновременно отслеживает изменения `Config`. Когда
// длительность таймаута изменяется, таймаут обновляется без перезапуска
// выполняемой операции.
for _ in 0..5 {
    // Клонируем дескриптор watch конфигурации для использования в этой задаче
    let mut rx = rx.clone();

    let handle = tokio::spawn(async move {
        // Начинаем первоначальную операцию и закрепляем future в стеке.
        // Закрепление в стеке требуется для возобновления операции
        // при множественных вызовах `select!`
        let op = my_async_operation();
        tokio::pin!(op);

        // Получаем начальное значение конфигурации
        let mut conf = rx.borrow().clone();

        let mut op_start = Instant::now();
        let sleep = time::sleep_until(op_start + conf.timeout);
        tokio::pin!(sleep);

        loop {
            tokio::select! {
                _ = &mut sleep => {
                    // Операция истекла. Перезапускаем ее
                    op.set(my_async_operation());

                    // Отслеживаем новое время начала
                    op_start = Instant::now();

                    // Перезапускаем таймаут
                    sleep.set(time::sleep_until(op_start + conf.timeout));
                }
                _ = rx.changed() => {
                    conf = rx.borrow_and_update().clone();

                    // Конфигурация была обновлена. Обновляем
                    // `sleep` используя новое значение `timeout`.
                    sleep.as_mut().reset(op_start + conf.timeout);
                }
                _ = &mut op => {
                    // Операция завершена!
                    return
                }
            }
        }
    });

    handles.push(handle);
}

for handle in handles.drain(..) {
    handle.await.unwrap();
}
}

Синхронизация состояния

Оставшиеся примитивы синхронизации сосредоточены на синхронизации состояния. Это асинхронные эквиваленты версий, предоставляемых std. Они работают аналогично своим аналогам из std, но будут ждать асинхронно вместо блокировки потока.

  • Barrier - Гарантирует, что несколько задач будут ждать друг друга для достижения точки в программе, прежде чем продолжить выполнение все вместе.
  • Mutex - Механизм взаимного исключения, который гарантирует, что не более одного потока в данный момент времени может получить доступ к некоторым данным.
  • Notify - Базовое уведомление задачи. Notify поддерживает уведомление принимающей задачи без отправки данных. В этом случае задача просыпается и возобновляет обработку.
  • RwLock - Предоставляет механизм взаимного исключения, который позволяет нескольким читателям одновременно, позволяя только одному писателю за раз. В некоторых случаях это может быть более эффективно, чем мьютекс.
  • Semaphore - Ограничивает количество параллелизма. Семафор содержит некоторое количество разрешений, которые задачи могут запрашивать для входа в критическую секцию. Семафоры полезны для реализации ограничений любого рода.

Совместимость со средой выполнения

Все примитивы синхронизации, предоставляемые в этом модуле, не зависят от среды выполнения. Вы можете свободно перемещать их между различными экземплярами среды выполнения Tokio или даже использовать их из сред выполнения, отличных от Tokio.

При использовании в среде выполнения Tokio примитивы синхронизации участвуют в кооперативном планировании, чтобы избежать голодания. Эта функция не применяется при использовании из сред выполнения, отличных от Tokio.

В качестве исключения, методы, оканчивающиеся на _timeout, не являются независимыми от среды выполнения, поскольку они требуют доступа к таймеру Tokio. См. документацию каждого метода *_timeout для получения дополнительной информации о его использовании.

Модули

ИмяОписание
broadcastМногопроизводительная, многопотребительская широковещательная очередь. Каждое отправленное значение видно всем потребителям.
futuresИменованные типы future.
mpscМногопроизводительная, однопотребительская очередь для отправки значений между асинхронными задачами.
oneshotОдноразовый канал используется для отправки одного сообщения между асинхронными задачами. Функция channel используется для создания пары дескрипторов Sender и Receiver, которые образуют канал.
watchМногопроизводительный, многопотребительский канал, который сохраняет только последнее отправленное значение.

Структуры

ИмяОписание
AcquireErrorОшибка, возвращаемая функцией Semaphore::acquire.
BarrierБарьер позволяет нескольким задачам синхронизировать начало некоторого вычисления.
BarrierWaitResultBarrierWaitResult возвращается wait, когда все задачи в Barrier встретились.
MappedMutexGuardДескриптор удерживаемого Mutex, к которому была применена функция через MutexGuard::map.
MutexАсинхронный тип, подобный Mutex.
MutexGuardДескриптор удерживаемого Mutex. Защита может удерживаться через любую точку .await, так как она Send.
NotifyУведомляет одну задачу о пробуждении.
OnceCellПотокобезопасная ячейка, в которую можно записать только один раз.
OwnedMappedMutexGuardВладеемый дескриптор удерживаемого Mutex, к которому была применена функция через OwnedMutexGuard::map.
OwnedMutexGuardВладеемый дескриптор удерживаемого Mutex.
OwnedRwLockMappedWriteGuardВладеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
OwnedRwLockReadGuardВладеемая структура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении.
OwnedRwLockWriteGuardВладеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
OwnedSemaphorePermitВладеемое разрешение от семафора.
RwLockАсинхронная блокировка читатель-писатель.
RwLockMappedWriteGuardСтруктура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
RwLockReadGuardСтруктура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении.
RwLockWriteGuardСтруктура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
SemaphoreСчитающий семафор, выполняющий асинхронное получение разрешений.
SemaphorePermitРазрешение от семафора.
SetOnceПотокобезопасная ячейка, в которую можно записать только один раз.
SetOnceErrorОшибка, которая может быть возвращена из SetOnce::set.
TryLockErrorОшибка, возвращаемая функциями Mutex::try_lock, RwLock::try_read и RwLock::try_write.

Перечисления

ИмяОписание
SetErrorОшибки, которые могут быть возвращены из OnceCell::set.
TryAcquireErrorОшибка, возвращаемая функцией Semaphore::try_acquire.

Модуль task

Асинхронные "зеленые потоки" (легковесные задачи).

Что такое задачи?

Задача — это легковесная, неблокирующая единица выполнения. Задача похожа на поток операционной системы, но вместо управления планировщиком ОС, ими управляет среда выполнения Tokio. Другое название этого общего шаблона — зеленые потоки (green threads). Если вы знакомы с горутинами Go, корутинами Kotlin или процессами Erlang, вы можете считать задачи Tokio чем-то подобным.

Ключевые моменты о задачах:

  • Задачи легковесны. Поскольку задачи планируются средой выполнения Tokio, а не операционной системой, создание новых задач или переключение между задачами не требует переключения контекста и имеет довольно низкие накладные расходы. Создание, выполнение и уничтожение большого количества задач достаточно дешево, особенно по сравнению с потоками ОС.

  • Задачи планируются кооперативно. Большинство операционных систем реализуют вытесняющую многозадачность. Это техника планирования, при которой ОС позволяет каждому потоку выполняться в течение периода времени, а затем вытесняет его, временно приостанавливая этот поток и переключаясь на другой. Задачи, напротив, реализуют кооперативную многозадачность. При кооперативной многозадачности задаче разрешено выполняться до тех пор, пока она не уступит (yield), указывая планировщику среды выполнения Tokio, что в данный момент она не может продолжать выполнение. Когда задача уступает, среда выполнения Tokio переключается на выполнение следующей задачи.

  • Задачи неблокирующие. Обычно, когда поток ОС выполняет ввод-вывод или должен синхронизироваться с другим потоком, он блокируется, позволяя ОС запланировать другой поток. Когда задача не может продолжить выполнение, она должна уступить, позволяя среде выполнения Tokio запланировать другую задачу. Задачи, как правило, не должны выполнять системные вызовы или другие операции, которые могут заблокировать поток, поскольку это предотвратит выполнение других задач, работающих в том же потоке. Вместо этого этот модуль предоставляет API для выполнения блокирующих операций в асинхронном контексте.

Работа с задачами

Этот модуль предоставляет следующие API для работы с задачами:

Порождение задач (Spawning)

Возможно, самой важной функцией в этом модуле является task::spawn. Эту функцию можно рассматривать как асинхронный эквивалент thread::spawn из стандартной библиотеки. Она принимает асинхронный блок или другой future и создает новую задачу для конкурентного выполнения этой работы:

#![allow(unused)]
fn main() {
use tokio::task;

task::spawn(async {
    // выполняем некоторую работу здесь...
});
}

Как и std::thread::spawn, task::spawn возвращает структуру JoinHandle. Сам JoinHandle является future, который может быть использован для ожидания результата порожденной задачи. Например:

#![allow(unused)]
fn main() {
use tokio::task;

let join = task::spawn(async {
    // ...
    "hello world!"
});

// ...

// Ожидаем результат порожденной задачи.
let result = join.await?;
assert_eq!(result, "hello world!");
}

Опять же, как и тип JoinHandle из std::thread, если порожденная задача паникует, ожидание ее JoinHandle вернет JoinError. Например:

#![allow(unused)]
fn main() {
use tokio::task;

let join = task::spawn(async {
    panic!("something bad happened!")
});

// Возвращенный результат указывает, что задача завершилась неудачно.
assert!(join.await.is_err());
}

spawn, JoinHandle и JoinError доступны при включенном флаге функции "rt".

Отмена (Cancellation)

Порожденные задачи могут быть отменены с помощью методов JoinHandle::abort или AbortHandle::abort. При вызове одного из этих методов задаче сигнализируют о завершении работы в следующий раз, когда она уступит в точке .await. Если задача уже простаивает, она будет завершена как можно скорее, без повторного запуска перед завершением. Кроме того, завершение работы среды выполнения Tokio (например, возврат из #[tokio::main]) немедленно отменяет все задачи в ней.

При завершении задач выполнение остановится на том .await, на котором она уступила. Все локальные переменные уничтожаются путем запуска их деструкторов. После завершения остановки ожидание JoinHandle завершится ошибкой отмены.

Обратите внимание, что отмена задачи не гарантирует, что она завершится с ошибкой отмены, поскольку она может сначала завершиться нормально. Например, если задача не уступает среде выполнения ни в одной точке между вызовом abort и концом задачи, то JoinHandle вместо этого сообщит, что задача завершилась нормально.

Имейте в виду, что задачи, порожденные с помощью spawn_blocking, не могут быть отменены, потому что они не являются асинхронными. Если вы вызовете abort для задачи spawn_blocking, это не окажет никакого эффекта, и задача продолжит выполняться нормально. Исключение составляет случай, когда задача еще не начала выполняться; в этом случае вызов abort может предотвратить запуск задачи.

Имейте в виду, что вызовы JoinHandle::abort только планируют отмену задачи и возвращаются до завершения отмены. Чтобы дождаться завершения отмены, дождитесь завершения задачи, ожидая JoinHandle. Аналогично, метод JoinHandle::is_finished не возвращает true, пока отмена не завершится.

Многократный вызов JoinHandle::abort имеет тот же эффект, что и однократный вызов.

Tokio также предоставляет AbortHandle, который похож на JoinHandle, за исключением того, что он не предоставляет механизма ожидания завершения задачи. Каждая задача может иметь только один JoinHandle, но она может иметь более одного AbortHandle.

Блокирование и уступка (Blocking and Yielding)

Как мы обсуждали выше, код, выполняющийся в асинхронных задачах, не должен выполнять операции, которые могут блокировать. Блокирующая операция, выполненная в задаче, работающей в потоке, который также выполняет другие задачи, заблокирует весь поток, не позволяя другим задачам выполняться.

Вместо этого Tokio предоставляет два API для выполнения блокирующих операций в асинхронном контексте: task::spawn_blocking и task::block_in_place.

Имейте в виду, что если вы вызываете не-асинхронный метод из асинхронного кода, этот не-асинхронный метод все еще находится внутри асинхронного контекста, поэтому вам также следует избегать там блокирующих операций. Это включает деструкторы объектов, уничтожаемых в асинхронном коде.

spawn_blocking

Функция task::spawn_blocking похожа на функцию task::spawn, обсуждавшуюся в предыдущем разделе, но вместо порождения неблокирующего future в среде выполнения Tokio она порождает блокирующую функцию в выделенном пуле потоков для блокирующих задач. Например:

#![allow(unused)]
fn main() {
use tokio::task;

task::spawn_blocking(|| {
    // выполняем ресурсоемкую работу или вызываем синхронный код
});
}

Так же, как task::spawn, task::spawn_blocking возвращает JoinHandle, который мы можем использовать для ожидания результата блокирующей операции:

#![allow(unused)]
fn main() {
let join = task::spawn_blocking(|| {
    // выполняем ресурсоемкую работу или вызываем синхронный код
    "blocking completed"
});

let result = join.await?;
assert_eq!(result, "blocking completed");
}

block_in_place

При использовании многопоточной среды выполнения также доступна функция task::block_in_place. Как и task::spawn_blocking, эта функция позволяет запускать блокирующую операцию из асинхронного контекста. Однако, в отличие от spawn_blocking, block_in_place работает путем перевода текущего рабочего потока в блокирующий поток, перемещая другие задачи, выполняющиеся в этом потоке, на другой рабочий поток. Это может повысить производительность, избегая переключений контекста.

Например:

#![allow(unused)]
fn main() {
use tokio::task;

let result = task::block_in_place(|| {
    // выполняем ресурсоемкую работу или вызываем синхронный код
    "blocking completed"
});

assert_eq!(result, "blocking completed");
}

yield_now

Кроме того, этот модуль предоставляет асинхронную функцию task::yield_now, которая аналогична thread::yield_now из стандартной библиотеки. Вызов и ожидание этой функции заставят текущую задачу уступить планировщику среды выполнения Tokio, позволяя запланировать другие задачи. В конечном итоге уступившая задача будет опрошена снова, что позволит ей выполниться. Например:

#![allow(unused)]
fn main() {
use tokio::task;

async {
    task::spawn(async {
        // ...
        println!("spawned task done!")
    });

    // Уступаем, позволяя сначала выполниться новой задаче.
    task::yield_now().await;
    println!("main task done!");
}
}

Модули

ИмяФлагиОписание
cooprtУтилиты для улучшенного кооперативного планирования.
futuresrtFuture, связанные с задачами.
join_setrtКоллекция задач, порожденных в среде выполнения Tokio.

Структуры

ИмяФлагиОписание
AbortHandlertВладение разрешением на отмену порожденной задачи без ожидания ее завершения.
Buildertokio_unstable и tracingФабрика, используемая для настройки свойств новой задачи.
IdrtНепрозрачный ID, однозначно идентифицирующий задачу относительно других текущих.
JoinErrorrtЗадача не выполнилась до завершения.
JoinHandlertВладение разрешением на присоединение к задаче (ожидание ее завершения).
JoinSetrtКоллекция задач, порожденных в среде выполнения Tokio.
LocalEnterGuardrtКонтекстная защита для LocalSet.
LocalKeyrtКлюч для локальных данных задачи.
LocalSetrtНабор задач, которые выполняются в одном потоке.

Функции

ИмяФлагиОписание
block_in_placert-multi-threadВыполняет предоставленную блокирующую функцию в текущем потоке без блокировки исполнителя.
idrtВозвращает Id текущей выполняемой задачи.
spawnrtПорождает новую асинхронную задачу, возвращая JoinHandle для нее.
spawn_blockingrtВыполняет предоставленное замыкание в потоке, где блокировка допустима.
spawn_localrtПорождает future !Send в текущем LocalSet или LocalRuntime.
try_idrtВозвращает Id текущей выполняемой задачи или None, если вызвано вне задачи.
yield_nowrtУступает выполнение обратно среде выполнения Tokio.

Модуль time

Доступно только с флагом функции time.

Утилиты для отслеживания времени.

Этот модуль предоставляет ряд типов для выполнения кода по истечении заданного периода времени.

  • Sleep - это future, который не выполняет работы и завершается в определенный момент времени Instant.

  • Interval - это поток, выдающий значение с фиксированным периодом. Он инициализируется Duration и повторно выдает значение каждый раз, когда длительность истекает.

  • Timeout - оборачивает future или поток, устанавливая верхнюю границу времени, которое ему разрешено выполняться. Если future или поток не завершается вовремя, он отменяется и возвращается ошибка.

Этих типов достаточно для обработки большого количества сценариев, связанных со временем.

Эти типы должны использоваться в контексте Runtime.

Примеры

Ожидание 100 мс и вывод сообщения

#![allow(unused)]
fn main() {
use std::time::Duration;
use tokio::time::sleep;

sleep(Duration::from_millis(100)).await;
println!("100 мс прошло");
}

Требование, чтобы операция занимала не более 1 секунды

#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};

async fn long_future() {
    // выполняем работу здесь
}

let res = timeout(Duration::from_secs(1), long_future()).await;

if res.is_err() {
    println!("операция превысила время ожидания");
}
}

Простой пример использования interval для выполнения задачи каждые две секунды

Разница между interval и sleep заключается в том, что interval измеряет время с последнего тика, что означает, что .tick().await может ждать меньше времени, чем указанная для интервала длительность, если между вызовами .tick().await прошло некоторое время.

Если в примере ниже заменить tick на sleep, задача будет выполняться только раз в три секунды, а не каждые две секунды.

#![allow(unused)]
fn main() {
use tokio::time;

async fn task_that_takes_a_second() {
    println!("hello");
    time::sleep(time::Duration::from_secs(1)).await
}

let mut interval = time::interval(time::Duration::from_secs(2));
for _i in 0..5 {
    interval.tick().await;
    task_that_takes_a_second().await;
}
}

Использование таймаута с потоком

#![allow(unused)]
fn main() {
use tokio::time::{timeout, Duration};
use tokio_stream::{self as stream, StreamExt};

let mut stream = stream::iter(1..=10);
let result = timeout(Duration::from_secs(1), async move {
    while let Some(value) = stream.next().await {
        println!("обрабатываем значение: {}", value);
        // Имитация работы
        tokio::time::sleep(Duration::from_millis(200)).await;
    }
}).await;

match result {
    Ok(_) => println!("поток завершился успешно"),
    Err(_) => println!("поток превысил время ожидания"),
}
}

Использование sleep_until для планирования на определенное время

#![allow(unused)]
fn main() {
use tokio::time::{sleep_until, Instant};
use std::time::Duration;

// Запланировать выполнение через 5 секунд от текущего момента
let deadline = Instant::now() + Duration::from_secs(5);
sleep_until(deadline).await;
println!("5 секунд прошло с момента планирования");
}

Реэкспорт

#![allow(unused)]
fn main() {
pub use std::time::Duration;
}

Модули

ИмяФлагиОписание
errorТипы ошибок времени.

Структуры

ИмяОписание
InstantИзмерение монотонно неубывающих часов. Непрозрачно и полезно только с Duration.
IntervalИнтервал, возвращаемый interval и interval_at.
SleepFuture, возвращаемый sleep и sleep_until.
TimeoutFuture, возвращаемый timeout и timeout_at.

Перечисления

ИмяОписание
MissedTickBehaviorОпределяет поведение Interval при пропуске тика.

Функции

ИмяФлагиОписание
advancetest-utilПеремещает время вперед.
intervalСоздает новый Interval, который выдает значения с интервалом period. Первый тик завершается немедленно. Поведение по умолчанию для пропущенных тиков - Burst, но это можно настроить, вызвав set_missed_tick_behavior.
interval_atСоздает новый Interval, который выдает значения с интервалом period, причем первый тик завершается в start. Поведение по умолчанию для пропущенных тиков - Burst, но это можно настроить, вызвав set_missed_tick_behavior.
pausetest-utilПриостанавливает время.
resumetest-utilВозобновляет время.
sleepОжидает, пока не пройдет указанная длительность.
sleep_untilОжидает, пока не будет достигнут указанный крайний срок.
timeoutТребует, чтобы Future завершился до истечения указанной длительности.
timeout_atТребует, чтобы Future завершился до указанного момента времени.

Дополнительные примеры

Настройка поведения при пропущенных тиках

#![allow(unused)]
fn main() {
use tokio::time::{interval, Duration, MissedTickBehavior};

let mut interval = interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
    interval.tick().await;
    println!("тик");
    // Если эта задача занимает больше 1 секунды, следующие тики будут пропущены
    // до следующего подходящего момента времени
}
}

Комбинирование нескольких таймеров

#![allow(unused)]
fn main() {
use tokio::time::{sleep, timeout, Duration};

async fn complex_operation() -> Result<(), Box<dyn std::error::Error>> {
    // Ожидание 500 мс
    sleep(Duration::from_millis(500)).await;
    
    // Выполнение операции с таймаутом 2 секунды
    let result = timeout(Duration::from_secs(2), async {
        // Длительная операция
        sleep(Duration::from_secs(1)).await;
        "результат операции"
    }).await?;
    
    println!("{}", result);
    Ok(())
}
}

Крейт futures_concurrency

Источник

Производительные, переносимые, структурированные операции конкурентности для асинхронного Rust. Работает с любой средой выполнения, не стирает времена жизни, всегда обрабатывает отмену и всегда возвращает результат вызывающей стороне.

futures-concurrency предоставляет операции конкурентности как для групп future, так и для stream. Как для ограниченных, так и для неограниченных наборов future и stream. В обоих случаях производительность должна быть на уровне, если не превосходить, традиционных реализаций исполнителей.

Примеры

Ожидание нескольких future разных типов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::future;

let a = future::ready(1u8);
let b = future::ready("hello");
let c = future::ready(3u16);
assert_eq!((a, b, c).join().await, (1, "hello", 3));
}

Конкурентная обработка элементов в коллекции

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
    .into_co_stream()
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello nori"]);
}

Доступ к данным стека вне области видимости future

Адаптировано из std::thread::scope.

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let mut container = vec![1, 2, 3];
let mut num = 0;

let a = async {
    println!("hello from the first future");
    dbg!(&container);
};

let b = async {
    println!("hello from the second future");
    num += container[0] + container[2];
};

println!("hello from the main future");
let _ = (a, b).join().await;
container.push(4);
assert_eq!(num, container.len());
}

Операции

Future

Для future, которые возвращают обычный тип T, доступны только операции join и race. join ожидает завершения всех future, тогда как race ожидает завершения первого future. Однако для future, которые возвращают Try<Output = T>, доступны две дополнительные операции. Следующая таблица описывает поведение операций конкурентности для ошибочных future:

Ожидание всех результатовОжидание первого результата
Продолжать при ошибкеFuture::joinFuture::race_ok
Прерывать при ошибкеFuture::try_joinFuture::race

Следующие реализации future предоставляются futures-concurrency:

  • FutureGroup: Расширяемая группа future, работающих как единое целое.
  • tuple: join, try_join, race, race_ok
  • array: join, try_join, race, race_ok
  • Vec: join, try_join, race, race_ok

Stream

Stream выдают результаты по одному, что означает, что решение о прекращении итерации одинаково для ошибочных и безошибочных потоков. Операции, предоставляемые для потоков, можно классифицировать на основе того, могут ли их входные данные оцениваться конкурентно и могут ли их выходные данные обрабатываться конкурентно.

Конкретно в случае merge, он принимает N потоков и выдает элементы по одному, как только они становятся доступными. Это позволяет конкурентно обрабатывать выходные данные отдельных потоков последующими операциями.

Последовательная обработка выводаКонкурентная обработка вывода
Последовательная оценка вводаStream::chainпока недоступно ‡
Конкурентная оценка вводаStream::zipStream::merge

‡: Это можно было бы решить с помощью гипотетической операции Stream::unzip, однако, поскольку мы стремимся к семантической совместимости с std::iter::Iterator в наших операциях, путь к её добавлению в настоящее время неясен.

Следующие реализации потоков предоставляются futures-concurrency:

  • StreamGroup: Расширяемая группа потоков, работающих как единое целое.
  • ConcurrentStream: Трейт для асинхронных потоков, которые могут конкурентно обрабатывать элементы.
  • tuple: chain, merge, zip
  • array: chain, merge, zip
  • Vec: chain, merge, zip

Поддержка сред выполнения

futures-concurrency не зависит от наличия какого-либо исполнителя среды выполнения. Это позволяет ему работать "из коробки" с любой асинхронной средой выполнения, включая: tokio, async-std, smol, glommio и monoio. Он также поддерживает среды #[no_std], что позволяет использовать его со встроенными асинхронными средами выполнения, такими как embassy.

Флаги функций

Флаг функции std включен по умолчанию. Для нацеливания на среды alloc или no_std вы можете использовать следующую конфигурацию:

[dependencies]
# no_std
futures-concurrency = { version = "7.5.0", default-features = false }

# alloc
futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }

Дополнительное чтение

futures-concurrency разрабатывался в течение нескольких лет. Основной сопровождающий — Йош Вуйтс (Yosh Wuyts), член рабочей группы по асинхронности Rust (Rust Async WG). Подробнее о разработке и идеях, стоящих за futures-concurrency, можно прочитать здесь:

СсылкаКраткое описание темы
Tree Structured Concurrency - What is structured concurrencyВведение в структурированную конкурентность и её основные концепции
Tree Structured Concurrency - Unstructured concurrency exampleПример неструктурированной конкурентности и её проблемы
Tree Structured Concurrency - Structured concurrency exampleПример структурированной конкурентности и её преимущества
Tree Structured Concurrency - What's the worst that can happenПотенциальные проблемы неструктурированной конкурентности
Tree Structured Concurrency - Applying to your programsПрактическое применение структурированной конкурентности
Tree Structured Concurrency - Managed background tasksПаттерн управления фоновыми задачами
Tree Structured Concurrency - Guaranteeing structureГарантии структурированной конкурентности
Tree Structured Concurrency - ConclusionВыводы и заключение по структурированной конкурентности
Tree Structured Concurrency - DijkstraСсылки на работы Дейкстры о структурированном программировании
Why Async RustОбоснование использования асинхронного Rust
Async CancellationОтмена асинхронных операций в Rust
EWD Notes on Structured ProgrammingКлассические заметки Дейкстры о структурированном программировании
Graphs and TreesГрафы и деревья в компьютерных науках
Tree Structured Concurrency - AcademiaАкадемические источники по структурированной конкурентности
Tree Structured Concurrency - RecursionРекурсия в контексте структурированной конкурентности
futures-concurrency docsДокументация крейта futures-concurrency
Tree Structured Concurrency - SQLАналогии с SQL в структурированной конкурентности
smol::Task docsДокументация по Task в крейте smol
smol::Task::cancelМетод отмены задач в smol
Tree Structured Concurrency - FlushОперация flush в контексте конкурентности
async_std::BufWriter docsБуферизованный писатель в async-std
tasky implementationПример реализации структурированных задач
Scoped TasksЗадачи с ограниченной областью видимости
Linearity and ControlЛинейность и управление в программировании
async-task-group docsДокументация крейта для группировки асинхронных задач
Tree Structured Concurrency - State MachineСтруктурированные конечные автоматы
Google's Rust JourneyОпыт Google по внедрению Rust в 2022 году

Модули

ИмяОписание
arrayВспомогательные функции и типы для массивов фиксированной длины.
concurrent_streamКонкурентное выполнение потоков.
futureБазовая асинхронная функциональность.
preludeПредел конкурентности future.
streamКомпонуемая асинхронная итерация.
vecТипы параллельных итераторов для векторов (Vec<T>).

Модули

Модуль array

Вспомогательные функции и типы для массивов фиксированной длины.

Структуры

ИмяОписание
AggregateErrorКоллекция ошибок.
ChainПоток, который объединяет несколько потоков один за другим.
JoinFuture, который ожидает завершения двух future схожего типа.
MergeПоток, который объединяет несколько потоков в один поток.
RaceFuture, который ожидает завершения первого future.
RaceOkFuture, который ожидает завершения первого успешного future.
TryJoinFuture, который ожидает успешного завершения всех future или досрочно прерывается при ошибке.
ZipПоток, который "объединяет" несколько потоков в один поток пар.

Примеры использования

Использование Join для ожидания нескольких future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::future;

let a = future::ready(1);
let b = future::ready(2);
let c = future::ready(3);

let result = [a, b, c].join().await;
assert_eq!(result, [1, 2, 3]);
}

Использование TryJoin для обработки ошибок

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let a = async { Ok::<i32, &str>(1) };
let b = async { Ok::<i32, &str>(2) };
let c = async { Err::<i32, &str>("error") };

let result = [a, b, c].try_join().await;
assert!(result.is_err());
}

Использование Race для гонки future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let fast = async { 1 };
let slow = async {
    sleep(Duration::from_secs(1)).await;
    2
};

let result = [fast, slow].race().await;
assert_eq!(result, 1);
}

Использование Merge для объединения потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let stream1 = stream::iter(vec![1, 2]);
let stream2 = stream::iter(vec![3, 4]);

let mut merged = [stream1, stream2].merge();
let mut results = Vec::new();

while let Some(value) = merged.next().await {
    results.push(value);
}

results.sort();
assert_eq!(results, vec![1, 2, 3, 4]);
}

Использование Zip для параллельной обработки потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);

let mut zipped = [stream1, stream2].zip();
let mut results = Vec::new();

while let Some(values) = zipped.next().await {
    results.push(values);
}

assert_eq!(results, vec![[1, 4], [2, 5], [3, 6]]);
}

Особенности работы с массивами

Модуль array предоставляет специализированные реализации для массивов фиксированной длины, что позволяет эффективно работать с известным на этапе компиляции количеством элементов. Это обеспечивает лучшую производительность и безопасность типов по сравнению с динамическими коллекциями.

Модуль concurrent_stream

Конкурентное выполнение потоков.

Примеры

Конкурентная обработка элементов в коллекции

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
    .into_co_stream()
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello nori"]);
}

Конкурентная обработка элементов в потоке

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::stream;

let v: Vec<_> = stream::repeat("chashu")
    .co()
    .take(2)
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello chashu"]);
}

Дополнительные примеры использования

Ограничение количества одновременных операций

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::time::{sleep, Duration};

let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
    .into_co_stream()
    .limit(2) // Ограничиваем до 2 одновременных операций
    .map(|n| async move {
        sleep(Duration::from_millis(100)).await;
        n * 2
    })
    .collect()
    .await;

assert_eq!(results, vec![2, 4, 6, 8, 10]);
}

Использование enumerate для получения индексов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let items = vec!["a", "b", "c"];
let results: Vec<_> = items
    .into_co_stream()
    .enumerate()
    .map(|(index, item)| async move {
        format!("{index}: {item}")
    })
    .collect()
    .await;

assert_eq!(results, vec!["0: a", "1: b", "2: c"]);
}

Селективное взятие элементов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
    .into_co_stream()
    .take(3) // Берем только первые 3 элемента
    .map(|n| async move { n * 2 })
    .collect()
    .await;

assert_eq!(results, vec![2, 4, 6]);
}

Структуры

ИмяОписание
EnumerateКонкурентный итератор, который выдает текущий счетчик и элемент во время итерации.
FromStreamКонкурентная реализация for each из Stream.
LimitКонкурентный итератор, который ограничивает количество применяемого параллелизма.
MapПреобразует элементы из одного типа в другой.
TakeКонкурентный итератор, который итерируется только по первым n итерациям.

Перечисления

ИмяОписание
ConsumerStateСостояние потребителя, используется для обратной связи с источником.

Трейты

ИмяОписание
ConcurrentStreamКонкурентная работа с элементами в потоке.
ConsumerОписывает тип, который может получать данные.
FromConcurrentStreamПреобразование из ConcurrentStream.
IntoConcurrentStreamПреобразование в ConcurrentStream.

Практическое применение

Обработка HTTP запросов с ограничением параллелизма

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use reqwest::Client;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let client = Client::new();
    let response = client.get(url).send().await?;
    response.text().await
}

let urls = vec![
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
];

let results: Vec<Result<String, _>> = urls
    .into_co_stream()
    .limit(2) // Максимум 2 одновременных запроса
    .map(|url| async move {
        fetch_url(url).await
    })
    .collect()
    .await;
}

Параллельная обработка файлов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::fs;

async fn process_file(path: &str) -> Result<usize, std::io::Error> {
    let content = fs::read_to_string(path).await?;
    Ok(content.len())
}

let files = vec!["file1.txt", "file2.txt", "file3.txt"];
let sizes: Vec<Result<usize, _>> = files
    .into_co_stream()
    .map(|file| async move {
        process_file(file).await
    })
    .collect()
    .await;
}

Преимущества конкурентных потоков

  • Эффективное использование ресурсов: Операции выполняются конкурентно, а не последовательно
  • Контроль параллелизма: Возможность ограничивать количество одновременных операций
  • Композиционность: Легко комбинировать с другими операциями потоков
  • Безопасность типов: Статическая проверка типов на этапе компиляции

Модуль future

Базовая асинхронная функциональность.

Пожалуйста, ознакомьтесь с ключевыми словами async и await и книгой по асинхронному программированию для получения дополнительной информации об асинхронном программировании в Rust.

Примеры

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::future::block_on;
use std::future;

block_on(async {
    // Ожидание нескольких future одинакового типа.
    let a = future::ready(1);
    let b = future::ready(2);
    let c = future::ready(3);
    assert_eq!([a, b, c].join().await, [1, 2, 3]);
    
    // Ожидание нескольких future разных типов.
    let a = future::ready(1u8);
    let b = future::ready("hello");
    let c = future::ready(3u16);
    assert_eq!((a, b, c).join().await, (1, "hello", 3));

    // Это также работает с векторами future, предоставляя альтернативу
    // `join_all` из futures-rs.
    let a = future::ready(1);
    let b = future::ready(2);
    let c = future::ready(3);
    assert_eq!(vec![a, b, c].join().await, vec![1, 2, 3]);
})
}

Конкурентность

Часто операции зависят от результата нескольких future. Вместо последовательного ожидания каждого future может быть более эффективным ожидать их конкурентно. Rust предоставляет встроенные механизмы в библиотеке, чтобы сделать это простым и удобным.

Безошибочная конкурентность

При работе с future, которые не возвращают типы Result, мы предоставляем две встроенные операции конкурентности:

  • future::Merge: ожидать завершения всех future в наборе
  • future::Race: ожидать завершения первого future в наборе

Поскольку future можно рассматривать как асинхронную последовательность из одного элемента, см. раздел конкурентности асинхронных итераторов для дополнительных операций асинхронной конкурентности.

Ошибочная конкурентность

При работе с future, которые возвращают типы Result, значение существующих операций меняется, и становятся доступными дополнительные операции конкурентности, учитывающие Result:

Ожидание всех результатовОжидание первого результата
Продолжать при ошибкеfuture::Mergefuture::RaceOk
Прерывать при ошибкеfuture::TryMergefuture::Race
  • future::TryMerge: ожидать успешного завершения всех future в наборе или вернуться при первой ошибке.
  • future::RaceOk: ожидать завершения первого успешного future в наборе или вернуть Err, если ни один future не завершился успешно.

Дополнительные примеры

Использование TryMerge с ошибочными future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let success = async { Ok::<i32, &str>(42) };
let error = async { Err::<i32, &str>("something went wrong") };
let another_success = async { Ok::<i32, &str>(100) };

let result = [success, error, another_success].try_join().await;
assert!(result.is_err()); // Прерывается на первой ошибке
}

Использование RaceOk для получения первого успешного результата

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let fast_error = async { 
    sleep(Duration::from_millis(10)).await;
    Err::<i32, &str>("fast error") 
};
let slow_success = async { 
    sleep(Duration::from_millis(50)).await;
    Ok::<i32, &str>(42) 
};
let medium_success = async { 
    sleep(Duration::from_millis(30)).await;
    Ok::<i32, &str>(100) 
};

let result = [fast_error, slow_success, medium_success].race_ok().await;
assert_eq!(result, Ok(100)); // Первый успешный результат
}

Использование FutureGroup для динамического управления future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_concurrency::future::FutureGroup;

let mut group = FutureGroup::new();

// Добавляем future в группу
group.push(async { 1 });
group.push(async { 2 });
group.push(async { 3 });

// Ожидаем завершения всех future в группе
let results = group.join().await;
assert_eq!(results, vec![1, 2, 3]);
}

Практический пример с сетевыми запросами

use futures_concurrency::prelude::*;
use reqwest::Client;

async fn fetch_data(client: &Client, url: &str) -> Result<String, reqwest::Error> {
    let response = client.get(url).send().await?;
    response.text().await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let urls = vec![
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers"
    ];

    // Параллельное выполнение всех запросов
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_data(&client, url))
        .collect();

    let results = futures.try_join().await?;
    
    for (i, result) in results.iter().enumerate() {
        println!("Ответ {}: {} символов", i, result.len());
    }
    
    Ok(())
}

Модули

ИмяОписание
future_groupРасширяемая группа future, которые действуют как единое целое.

Структуры

ИмяОписание
FutureGroupРасширяемая группа future, которые действуют как единое целое.
WaitUntilПриостанавливает future до указанного крайнего срока.

Трейты

ИмяОписание
FutureExtТрейт-расширение для трейта Future.
JoinОжидание завершения всех future.
RaceОжидание завершения первого future.
RaceOkОжидание завершения первого успешного future.
TryJoinОжидание успешного завершения всех future или досрочное прерывание при ошибке.

Ключевые преимущества

  • Типобезопасность: Статическая проверка типов на этапе компиляции
  • Гибкость: Работа с future разных типов и структур данных
  • Эффективность: Конкурентное выполнение вместо последовательного
  • Композиционность: Легкое комбинирование операций
  • Обработка ошибок: Различные стратегии для работы с ошибочными сценариями

Модуль prelude

Предел (prelude) конкурентности futures.

Реэкспорт

#![allow(unused)]
fn main() {
pub use super::future::FutureExt as _;
pub use super::stream::StreamExt as _;
pub use super::future::Join as _;
pub use super::future::Race as _;
pub use super::future::RaceOk as _;
pub use super::future::TryJoin as _;
pub use super::stream::Chain as _;
pub use super::stream::IntoStream as _;
pub use super::stream::Merge as _;
pub use super::stream::Zip as _;
pub use super::concurrent_stream::ConcurrentStream;
pub use super::concurrent_stream::FromConcurrentStream;
pub use super::concurrent_stream::IntoConcurrentStream;
}

Описание

Модуль prelude предоставляет удобный способ импортировать все основные трейты и типы futures-concurrency одним оператором use. Это стандартная практика в экосистеме Rust для упрощения импорта часто используемых компонентов.

Использование

Добавьте следующую строку в начало вашего файла для импорта всех основных компонентов futures-concurrency:

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
}

Что включается в предел

Трейты для Future

  • FutureExt - методы расширения для Future
  • Join - ожидание завершения всех future
  • Race - ожидание завершения первого future
  • RaceOk - ожидание завершения первого успешного future
  • TryJoin - ожидание успешного завершения всех future или досрочное прерывание при ошибке

Трейты для Stream

  • StreamExt - методы расширения для Stream
  • Chain - объединение потоков
  • IntoStream - преобразование в поток
  • Merge - слияние потоков
  • Zip - объединение потоков в пары

Трейты для конкурентных потоков

  • ConcurrentStream - конкурентная работа с элементами в потоке
  • FromConcurrentStream - преобразование из конкурентного потока
  • IntoConcurrentStream - преобразование в конкурентный поток

Примеры использования

Базовое использование

use futures_concurrency::prelude::*;
use std::future;

#[tokio::main]
async fn main() {
    // Использование методов из FutureExt
    let a = future::ready(1);
    let b = future::ready(2);
    
    // Метод join() доступен благодаря импорту трейта Join
    let result = (a, b).join().await;
    assert_eq!(result, (1, 2));
}

Работа с потоками

use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2]);
    let stream2 = stream::iter(vec![3, 4]);
    
    // Метод merge() доступен благодаря импорту трейта Merge
    let merged = (stream1, stream2).merge();
    
    let results: Vec<_> = merged.collect().await;
    // Порядок элементов может варьироваться из-за природы merge
    assert!(results.contains(&1) && results.contains(&4));
}

Конкурентные потоки

use futures_concurrency::prelude::*;

#[tokio::main]
async fn main() {
    let items = vec!["hello", "world"];
    
    // Преобразование в конкурентный поток
    let results: Vec<String> = items
        .into_co_stream()  // Доступно благодаря IntoConcurrentStream
        .map(|s| async move { s.to_uppercase() })
        .collect()         // Доступно благодаря ConcurrentStream
        .await;
    
    assert_eq!(results, vec!["HELLO", "WORLD"]);
}

Преимущества использования prelude

  1. Удобство: Один импорт вместо множества отдельных
  2. Согласованность: Стандартизированный способ импорта
  3. Полнота: Все основные компоненты доступны сразу
  4. Обновляемость: Новые функции автоматически включаются в предел

Примечание

Импорт с as _ означает, что трейты импортируются только для их побочных эффектов (добавления методов к существующим типам), но сами имена трейтов не добавляются в область видимости. Это предотвращает конфликты имен.

Например, после use futures_concurrency::prelude::*; вы можете использовать методы типа join() на кортежах future, но не можете напрямую ссылаться на тип Join.

Модуль stream

Компонуемая асинхронная итерация.

Примеры

Объединение нескольких потоков для обработки значений сразу по готовности

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::stream::{self, StreamExt};
use futures_lite::future::block_on;

block_on(async {
    let a = stream::once(1);
    let b = stream::once(2);
    let c = stream::once(3);
    let s = (a, b, c).merge();

    let mut counter = 0;
    s.for_each(|n| counter += n).await;
    assert_eq!(counter, 6);
})
}

Конкурентность

При работе с несколькими (асинхронными) итераторами порядок, в котором итераторы ожидаются, важен. Как часть асинхронных итераторов, Rust предоставляет встроенные операции для управления порядком выполнения наборов итераторов:

  • merge: объединяет несколько итераторов в один итератор, где новый итератор выдает элемент, как только он становится доступен из одного из базовых итераторов.
  • zip: объединяет несколько итераторов в итератор пар. Базовые итераторы будут ожидаться конкурентно.
  • chain: итерируется по нескольким итераторам последовательно. Следующий итератор в последовательности не начнется, пока предыдущий итератор не завершится.

Future

Future можно рассматривать как асинхронные последовательности из одного элемента. Используя stream::once, future можно преобразовать в асинхронные итераторы и затем использовать с любыми методами конкурентности итераторов. Это позволяет использовать такие операции, как stream::Merge, для конкурентного выполнения наборов future, но получать выходные данные отдельных future, как только они становятся доступны.

См. документацию по конкурентности future для получения дополнительной информации о конкурентности future.

Дополнительные примеры

Использование zip для параллельной обработки потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let names = stream::iter(vec!["Alice", "Bob", "Charlie"]);
let ages = stream::iter(vec![25, 30, 35]);

let zipped = (names, ages).zip();
let mut results = Vec::new();

while let Some((name, age)) = zipped.next().await {
    results.push(format!("{} is {} years old", name, age));
}

assert_eq!(results, vec![
    "Alice is 25 years old",
    "Bob is 30 years old", 
    "Charlie is 35 years old"
]);
}

Использование chain для последовательного соединения потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let first = stream::iter(vec![1, 2]);
let second = stream::iter(vec![3, 4]);
let third = stream::iter(vec![5, 6]);

let chained = (first, second, third).chain();
let results: Vec<_> = chained.collect().await;

assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
}

Использование StreamGroup для динамического управления потоками

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_concurrency::stream::StreamGroup;
use tokio_stream::{self as stream, StreamExt};

let mut group = StreamGroup::new();

// Добавляем потоки в группу
group.push(stream::iter(vec![1, 2]));
group.push(stream::iter(vec![3, 4]));

// Объединяем все потоки в группе
let mut merged = group.merge();
let mut results = Vec::new();

while let Some(value) = merged.next().await {
    results.push(value);
}

results.sort();
assert_eq!(results, vec![1, 2, 3, 4]);
}

Практический пример с обработкой событий

use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};
use tokio::time::{interval, Duration};
use std::pin::Pin;

#[tokio::main]
async fn main() {
    // Создаем несколько потоков событий
    let user_events = stream::iter(vec!["login", "purchase", "logout"]);
    let system_events = stream::iter(vec!["startup", "shutdown"]);
    let timer_events = {
        let mut interval = interval(Duration::from_secs(1));
        stream::unfold((), move |_| {
            let interval = Pin::new(&mut interval);
            async move {
                interval.tick().await;
                Some(("tick", ()))
            }
        })
    };

    // Объединяем все потоки событий
    let all_events = (user_events, system_events, timer_events.take(2)).merge();
    
    let mut event_count = 0;
    all_events
        .for_each(|event| {
            event_count += 1;
            println!("Обработано событие: {:?}", event);
            async {}
        })
        .await;
        
    println!("Всего обработано событий: {}", event_count);
}

Модули

ИмяОписание
stream_groupРасширяемая группа потоков, которые действуют как единое целое.

Структуры

ИмяОписание
StreamGroupРасширяемая группа потоков, которые действуют как единое целое.
WaitUntilЗадерживает выполнение потока один раз на указанную длительность.

Трейты

ИмяОписание
ChainБерет несколько потоков и создает новый поток для всех последовательно.
IntoStreamПреобразование в Stream.
MergeОбъединяет несколько потоков в один поток всех их выходных данных.
StreamExtТрейт-расширение для трейта Stream.
Zip"Объединяет" несколько потоков в один поток пар.

Ключевые особенности

  • Композиционность: Легко комбинировать различные операции с потоками
  • Конкурентность: Эффективное управление несколькими потоками данных
  • Безопасность типов: Статическая проверка на этапе компиляции
  • Гибкость: Поддержка различных структур данных и паттернов использования
  • Эффективность: Минимизация накладных расходов при работе с потоками

Модуль vec

Типы параллельных итераторов для векторов (Vec<T>)

Вам редко понадобится взаимодействовать с этим модулем напрямую, если только вам не нужно указать имя одного из типов итераторов.

Структуры

ИмяОписание
AggregateErrorКоллекция ошибок.
ChainПоток, который объединяет несколько потоков один за другим.
IntoConcurrentStreamКонкурентный асинхронный итератор, который перемещается из вектора.
JoinFuture, который ожидает завершения нескольких future.
MergeПоток, который объединяет несколько потоков в один поток.
RaceFuture, который ожидает завершения первого future.
RaceOkFuture, который ожидает завершения первого успешного future.
TryJoinFuture, который ожидает успешного завершения всех future или досрочно прерывается при ошибке.
ZipПоток, который "объединяет" несколько потоков в один поток пар.

Примеры использования

Базовое использование с векторами

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let futures = vec![
    async { 1 },
    async { 2 }, 
    async { 3 },
];

let results = futures.join().await;
assert_eq!(results, vec![1, 2, 3]);
}

Конкурентная обработка элементов вектора

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::time::{sleep, Duration};

let items = vec!["hello", "world", "rust"];

let results: Vec<String> = items
    .into_co_stream()  // Преобразование вектора в конкурентный поток
    .map(|s| async move {
        sleep(Duration::from_millis(10)).await;
        s.to_uppercase()
    })
    .collect()
    .await;

assert_eq!(results, vec!["HELLO", "WORLD", "RUST"]);
}

Обработка ошибок с векторами future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let futures = vec![
    async { Ok::<i32, &str>(1) },
    async { Err::<i32, &str>("error occurred") },
    async { Ok::<i32, &str>(3) },
];

let result = futures.try_join().await;
assert!(result.is_err());
}

Гонка future в векторе

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let futures = vec![
    async {
        sleep(Duration::from_millis(100)).await;
        1
    },
    async {
        sleep(Duration::from_millis(50)).await;
        2
    },
    async {
        sleep(Duration::from_millis(10)).await;
        3
    },
];

let winner = futures.race().await;
assert_eq!(winner, 3); // Самый быстрый future
}

Объединение потоков из векторов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let streams = vec![
    stream::iter(vec![1, 2]),
    stream::iter(vec![3, 4]),
    stream::iter(vec![5, 6]),
];

let mut merged = streams.merge();
let mut results = Vec::new();

while let Some(value) = merged.next().await {
    results.push(value);
}

results.sort();
assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
}

Практический пример: параллельные HTTP запросы

use futures_concurrency::prelude::*;
use reqwest::Client;

async fn fetch_url(client: &Client, url: &str) -> Result<String, reqwest::Error> {
    let response = client.get(url).send().await?;
    response.text().await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let urls = vec![
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent", 
        "https://httpbin.org/headers",
    ];

    // Создаем вектор future
    let fetch_futures: Vec<_> = urls
        .into_iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    // Параллельное выполнение всех запросов
    let results = fetch_futures.try_join().await?;

    for (i, content) in results.iter().enumerate() {
        println!("Запрос {}: {} байт", i, content.len());
    }

    Ok(())
}

Использование RaceOk с вектором ошибочных операций

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let operations = vec![
    async {
        sleep(Duration::from_millis(50)).await;
        Err::<i32, &str>("slow error")
    },
    async {
        sleep(Duration::from_millis(10)).await; 
        Ok::<i32, &str>(42)
    },
    async {
        sleep(Duration::from_millis(30)).await;
        Ok::<i32, &str>(100)
    },
];

let result = operations.race_ok().await;
assert_eq!(result, Ok(42)); // Первый успешный результат
}

Особенности работы с векторами

Модуль vec предоставляет специализированные реализации для типа Vec<T>, что обеспечивает:

  • Эффективность: Оптимизированные реализации для динамических коллекций
  • Гибкость: Работа с переменным количеством элементов
  • Удобство: Простое преобразование существующих векторов
  • Интеграция: Легкое взаимодействие с другими частями стандартной библиотеки

Примечание по использованию

Хотя вы можете напрямую использовать типы из этого модуля, в большинстве случаев достаточно импорта futures_concurrency::prelude::* и использования методов напрямую на векторах:

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

// Вместо прямого использования vec::Join:
// let results = vec::Join::new(futures).await;

// Просто используйте метод на векторе:
let results = futures.join().await;
}

Крейт futures_lite

Источник: Документация futures_lite

Фьючерсы, потоки и комбинаторы для асинхронного ввода-вывода.

Описание

Этот крейт представляет собой подмножество futures, который компилируется на порядок быстрее, исправляет мелкие недостатки в его API, заполняет некоторые очевидные пробелы и удаляет почти весь небезопасный код из него.

Короче говоря, этот крейт стремится быть более удобным, чем futures, но при этом полностью совместимым с ним.

API этого крейта намеренно ограничен. Пожалуйста, ознакомьтесь со списком функций для API, которые исключены из этого крейта.

Примеры

use futures_lite::future;

fn main() {
    future::block_on(async {
        println!("Hello world!");
    })
}

Реэкспорты

ТрейтМодульОписание
AsyncBufReadioАсинхронное чтение с буферизацией
AsyncBufReadExtioРасширения для AsyncBufRead
AsyncReadioАсинхронное чтение
AsyncReadExtioРасширения для AsyncRead
AsyncSeekioАсинхронный поиск
AsyncSeekExtioРасширения для AsyncSeek
AsyncWriteioАсинхронная запись
AsyncWriteExtioРасширения для AsyncWrite
FuturefutureБазовый трейт для асинхронных вычислений
FutureExtfutureРасширения для Future
StreamstreamПоток значений
StreamExtstreamРасширения для Stream

Модули

МодульОписание
futureКомбинаторы для трейта Future
ioИнструменты и комбинаторы для ввода-вывода
preludeТрейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения
streamКомбинаторы для трейта Stream

Макросы

МакросОписание
pinЗакрепляет переменную типа T в стеке и перепривязывает её как Pin<&mut T>
readyРазворачивает Poll<T> или возвращает Pending

Преимущества перед futures

Скорость компиляции

  • На порядок быстрее: Значительно ускоренная компиляция
  • Минимальные зависимости: Меньше зависимостей для компиляции

Улучшения API

  • Исправленные недостатки: Устранены мелкие проблемы оригинального API
  • Заполненные пробелы: Добавлены отсутствующие, но полезные функции
  • Более интуитивный дизайн: Улучшенный пользовательский опыт

Безопасность

  • Минимум unsafe кода: Почти полное отсутствие небезопасного кода
  • Надежность: Повышенная безопасность выполнения

Совместимость

Полная совместимость

  • Интероперабельность: Полная совместимость с крейтом futures
  • Миграция: Легкий переход с futures на futures_lite

Ограниченный API

  • Целевой подход: Только наиболее часто используемые функции
  • Чистота дизайна: Отсутствие перегруженности функциями

Типичное использование

Базовый асинхронный код

#![allow(unused)]
fn main() {
use futures_lite::future;
use futures_lite::stream::StreamExt;

future::block_on(async {
    // Асинхронные операции
});
}

Работа с I/O

#![allow(unused)]
fn main() {
use futures_lite::io::{AsyncReadExt, AsyncWriteExt};
use futures_lite::future;

// Асинхронные операции чтения/записи
}

Потоки данных

#![allow(unused)]
fn main() {
use futures_lite::stream::{self, StreamExt};

let mut stream = stream::iter(vec![1, 2, 3]);
while let Some(item) = stream.next().await {
    println!("{}", item);
}
}

Рекомендации по использованию

Когда использовать futures_lite

  • Новые проекты: Для быстрого старта
  • Производительность компиляции: Когда важна скорость сборки
  • Простота: Для проектов, не требующих полного функционала futures

Когда использовать futures

  • Специфичные функции: При необходимости функций, отсутствующих в futures_lite
  • Существующие проекты: При миграции существующего кода
  • Расширенные сценарии: Для сложных случаев использования

Крейт async_io

Источник: Документация async_io

Асинхронный ввод-вывод и таймеры.

Описание

Этот крейт предоставляет два основных инструмента:

  1. Async - адаптер для стандартных сетевых типов (и многих других типов) для использования в асинхронных программах
  2. Timer - фьючерс или поток, который эмитирует события по времени

Для конкретных асинхронных сетевых типов, построенных на основе этого крейта, см. async-net.

Реализация

При первом использовании Async или Timer создается поток с именем "async-io". Цель этого потока - ожидать события ввода-вывода, о которых сообщает операционная система, а затем пробуждать соответствующие фьючерсы, заблокированные на операциях ввода-вывода или таймерах, когда их можно возобновить.

Для ожидания следующего события ввода-вывода поток "async-io" использует:

  • epoll на Linux/Android/illumos
  • kqueue на macOS/iOS/BSD
  • event ports на illumos/Solaris
  • IOCP на Windows

Эта функциональность предоставляется крейтом polling.

Однако обратите внимание, что вы также можете обрабатывать события ввода-вывода и пробуждать фьючерсы на любом потоке с помощью функции block_on(). Таким образом, поток "async-io" является лишь резервным механизмом обработки событий ввода-вывода на случай, если других потоков нет.

Примеры

Подключение к example.com:80 с таймаутом 10 секунд:

#![allow(unused)]
fn main() {
use async_io::{Async, Timer};
use futures_lite::{future::FutureExt, io};

use std::net::{TcpStream, ToSocketAddrs};
use std::time::Duration;

let addr = "example.com:80".to_socket_addrs()?.next().unwrap();

let stream = Async::<TcpStream>::connect(addr).or(async {
    Timer::after(Duration::from_secs(10)).await;
    Err(io::ErrorKind::TimedOut.into())
})
.await?;
}

Модули

МодульОписание
osПлатформо-специфичная функциональность

Структуры

СтруктураОписание
AsyncАсинхронный адаптер для I/O типов
ReadableФьючерс для Async::readable
ReadableOwnedФьючерс для Async::readable_owned
TimerФьючерс или поток, который эмитирует события по времени
WritableФьючерс для Async::writable
WritableOwnedФьючерс для Async::writable_owned

Трейты

ТрейтОписание
IoSafeТипы, реализации I/O трейтов которых не удаляют базовый I/O источник

Функции

ФункцияОписание
block_onБлокирует текущий поток на фьючерсе, обрабатывая I/O события в простое

Ключевые особенности

Архитектура

  • Фоновый поток: "async-io" поток для обработки событий
  • Кросс-платформенность: Единый API для разных операционных систем
  • Резервный механизм: Основная обработка может происходить в любом потоке

Использование Async адаптера

#![allow(unused)]
fn main() {
use async_io::Async;
use std::net::TcpStream;

// Создание асинхронного TCP-соединения
let stream = Async::<TcpStream>::connect("example.com:80").await?;
}

Использование таймеров

#![allow(unused)]
fn main() {
use async_io::Timer;
use std::time::Duration;

// Ожидание 1 секунды
Timer::after(Duration::from_secs(1)).await;

// Периодический таймер
let mut interval = Timer::interval(Duration::from_secs(1));
while interval.next().await.is_some() {
    println!("Прошла 1 секунда");
}
}

Зависимости

  • polling: Для кросс-платформенного опроса событий ввода-вывода
  • futures-lite: Для асинхронных примитивов и комбинаторов
  • concurrent-queue: Для внутренней синхронизации

Производительность

  • Эффективное пробуждение: Точечное пробуждение только нужных фьючерсов
  • Минимальные накладные расходы: Легковесная реализация
  • Масштабируемость: Поддержка большого количества одновременных операций

Крейт async_compat

Источник: Документация async_compat

Адаптер совместимости между tokio и futures.

Описание

Существует два вида проблем совместимости между tokio и futures:

  1. Типы Tokio не могут использоваться вне контекста tokio, поэтому любая попытка их использования приведет к панике.

    Решение: Если вы примените адаптер Compat к фьючерсу, фьючерс вручную войдет в контекст глобального runtime tokio. Если runtime уже доступен через thread-local переменные tokio, он будет использован. В противном случае, новый однопоточный runtime будет создан по требованию. Это не означает, что фьючерс опрашивается runtime tokio - это означает, что фьючерс устанавливает thread-local переменную, указывающую на глобальный runtime tokio, чтобы типы tokio могли использоваться внутри него.

  2. Tokio и futures имеют похожие, но разные I/O трейты AsyncRead, AsyncWrite, AsyncBufRead и AsyncSeek.

    Решение: Когда адаптер Compat применяется к I/O типу, он будет реализовывать трейты противоположного вида. Таким образом вы можете использовать типы на основе tokio там, где ожидаются типы на основе futures, и наоборот.

Вы можете применить адаптер Compat с помощью конструктора Compat::new() или используя любой метод из трейта CompatExt.

Примеры

Эта программа читает строки из stdin и выводит их в stdout, но она не будет работать:

fn main() -> std::io::Result<()> {
    futures::executor::block_on(async {
        let stdin = tokio::io::stdin();
        let mut stdout = tokio::io::stdout();

        // Следующая строка не будет работать по двум причинам:
        // 1. Ошибка времени выполнения, потому что stdin и stdout используются вне контекста tokio.
        // 2. Ошибка компиляции из-за несовпадения трейтов `AsyncRead` и `AsyncWrite`.
        futures::io::copy(stdin, &mut stdout).await?;
        Ok(())
    })
}

Чтобы обойти проблемы совместимости, примените адаптер Compat к stdin, stdout и futures::io::copy():

use async_compat::CompatExt;

fn main() -> std::io::Result<()> {
    futures::executor::block_on(async {
        let stdin = tokio::io::stdin();
        let mut stdout = tokio::io::stdout();

        futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).compat().await?;
        Ok(())
    })
}

Также возможно применить Compat к внешнему фьючерсу, передаваемому в futures::executor::block_on(), а не к самому futures::io::copy(). Когда адаптер применяется к внешнему фьючерсу, отдельные внутренние фьючерсы не нуждаются в адаптере, потому что они теперь все внутри контекста tokio:

use async_compat::{Compat, CompatExt};

fn main() -> std::io::Result<()> {
    futures::executor::block_on(Compat::new(async {
        let stdin = tokio::io::stdin();
        let mut stdout = tokio::io::stdout();

        futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).await?;
        Ok(())
    }))
}

Адаптер совместимости преобразует между I/O типами на основе tokio и futures в любом направлении. Вот как мы можем написать ту же программу, используя I/O типы на основе futures внутри tokio:

use async_compat::CompatExt;
use blocking::Unblock;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut stdin = Unblock::new(std::io::stdin());
    let mut stdout = Unblock::new(std::io::stdout());

    tokio::io::copy(&mut stdin.compat_mut(), &mut stdout.compat_mut()).await?;
    Ok(())
}

Наконец, мы можем использовать любой крейт на основе tokio из любого другого асинхронного runtime. Вот пример с reqwest и warp:

use async_compat::{Compat, CompatExt};
use warp::Filter;

fn main() {
    futures::executor::block_on(Compat::new(async {
        // Выполняем HTTP GET запрос.
        let response = reqwest::get("https://www.rust-lang.org").await.unwrap();
        println!("{}", response.text().await.unwrap());

        // Запускаем HTTP сервер.
        let routes = warp::any().map(|| "Hello from warp!");
        warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
    }))
}

Структуры

СтруктураОписание
CompatАдаптер совместимости для фьючерсов и I/O типов

Трейты

ТрейтОписание
CompatExtПрименяет адаптер Compat к фьючерсам и I/O типам

Ключевые особенности

Решение проблем контекста

  • Автоматическое создание runtime: Создает runtime tokio при необходимости
  • Thread-local контекст: Устанавливает контекст для использования типов tokio
  • Прозрачность: Минимальные изменения в существующем коде

Решение проблем I/O трейтов

  • Двунаправленное преобразование: Совместимость в обоих направлениях
  • Трейты futures ↔ tokio: Автоматическая реализация соответствующих трейтов
  • Универсальность: Работает с любыми I/O типами

Методы трейта CompatExt

МетодНазначение
compat()Применяет адаптер к значению
compat_mut()Применяет адаптер к изменяемой ссылке
compat_ref()Применяет адаптер к неизменяемой ссылке

Сценарии использования

Использование tokio кода с futures исполнителем

#![allow(unused)]
fn main() {
use async_compat::CompatExt;

futures::executor::block_on(async {
    let client = reqwest::Client::new();
    let response = client.get("http://example.com").send().compat().await?;
    // ...
});
}

Смешивание I/O типов

#![allow(unused)]
fn main() {
use async_compat::CompatExt;

// Использование tokio TcpStream с futures кодом
let stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await?;
futures::io::copy(&mut stream.compat(), &mut output).await?;
}

Интеграция библиотек

#![allow(unused)]
fn main() {
use async_compat::{Compat, CompatExt};

// Использование warp (tokio) с futures исполнителем
futures::executor::block_on(Compat::new(async {
    warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}));
}

Крейт smol

Источник: Официальная документация smol

Маленький и быстрый асинхронный рантайм.

Этот крейт просто реэкспортирует другие небольшие асинхронные крейты (см. исходный код).

Использование с библиотеками на базе Tokio

Чтобы использовать библиотеки на базе Tokio со smol, примените адаптер async-compat к фьючерсам и типам I/O.

Упрощенная настройка

См. крейт smol-macros, если вам нужна настройка без proc-макросов, с быстрой компиляцией и простым в использовании асинхронным main и/или многопоточным исполнителем (Executor) "из коробки".

Дополнения к smol

В дополнение к проекту smol в качестве прямой замены, вам могут быть полезны и другие части экосистемы фьючерсов, включая futures-concurrency, async-io, futures-lite и async-compat.

Примеры

Подключение к HTTP-сайту, выполнение GET-запроса и вывод ответа в стандартный вывод:

use smol::{io, net, prelude::*, Unblock};

fn main() -> io::Result<()> {
    smol::block_on(async {
        let mut stream = net::TcpStream::connect("example.com:80").await?;
        let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
        stream.write_all(req).await?;

        let mut stdout = Unblock::new(std::io::stdout());
        io::copy(stream, &mut stdout).await?;
        Ok(())
    })
}

Больше примеров можно найти в директории examples.

Модули

МодульОписание
channelАсинхронный многопоточный канал, где каждое сообщение может быть получено только одним из всех существующих потребителей
fsПримитивы для асинхронной работы с файловой системой
futureКомбинаторы для трейта Future
ioИнструменты и комбинаторы для I/O операций
lockАсинхронные примитивы синхронизации
netАсинхронные примитивы для TCP/UDP/Unix коммуникации
preludeТрейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения
processАсинхронный интерфейс для работы с процессами
streamКомбинаторы для трейта Stream

Макросы

МакросОписание
pinФиксирует переменную типа T в стеке и перепривязывает её как Pin<&mut T>
readyРазворачивает Poll<T> или возвращает Pending

Структуры

СтруктураОписание
AsyncАсинхронный адаптер для I/O типов
ExecutorАсинхронный исполнитель
LocalExecutorПоточно-локальный исполнитель
TaskЗапущенная задача
TimerФьючерс или поток, который эмитирует события по времени
UnblockВыполняет блокирующий I/O в пуле потоков

Функции

ФункцияОписание
block_onБлокирует текущий поток на фьючерсе, обрабатывая I/O события в простое
spawnЗапускает задачу в глобальном исполнителе (по умолчанию однопоточном)
unblockВыполняет блокирующий код в пуле потоков

Модули

Крейт channel

Источник: Документация async-channel

Асинхронный многопоточный канал, где каждое сообщение может быть получено только одним из всех существующих потребителей.

Типы каналов

Существует два вида каналов:

  • Ограниченный канал (bounded) с ограниченной емкостью
  • Неограниченный канал (unbounded) с неограниченной емкостью

Структура канала

Канал имеет сторону Отправителя (Sender) и сторону Получателя (Receiver). Обе стороны могут быть клонированы и использоваться из нескольких потоков.

Закрытие канала

Когда все отправители или все получатели уничтожаются, канал закрывается. Когда канал закрыт, нельзя отправлять новые сообщения, но оставшиеся сообщения все еще можно получить.

Канал также можно закрыть вручную, вызвав Sender::close() или Receiver::close().

Примеры

#![allow(unused)]
fn main() {
let (s, r) = async_channel::unbounded();

assert_eq!(s.send("Hello").await, Ok(()));
assert_eq!(r.recv().await, Ok("Hello"));
}

Структуры

СтруктураОписание
ReceiverСторона получения канала
RecvФьючерс, возвращаемый Receiver::recv()
RecvErrorОшибка, возвращаемая из Receiver::recv()
SendФьючерс, возвращаемый Sender::send()
SendErrorОшибка, возвращаемая из Sender::send()
SenderСторона отправки канала
WeakReceiverReceiver, который не препятствует закрытию канала
WeakSenderSender, который не препятствует закрытию канала

Перечисления

ПеречислениеОписание
TryRecvErrorОшибка, возвращаемая из Receiver::try_recv()
TrySendErrorОшибка, возвращаемая из Sender::try_send()

Функции

ФункцияОписание
boundedСоздает ограниченный канал
unboundedСоздает неограниченный канал

Крейт fs

Источник: Документация async-fs

Асинхронные примитивы для работы с файловой системой.

Этот крейт представляет собой асинхронную версию std::fs.

Реализация

Этот крейт использует механизм блокировки (blocking) для выгрузки блокирующих I/O операций в пул потоков.

Примеры

Создание нового файла и запись в него байтов:

#![allow(unused)]
fn main() {
use async_fs::File;
use futures_lite::io::AsyncWriteExt;

let mut file = File::create("a.txt").await?;
file.write_all(b"Hello, world!").await?;
file.flush().await?;
}

Модули

МодульОписание
unixUnix-специфичные расширения

Структуры

СтруктураОписание
DirBuilderПостроитель для создания директорий с настраиваемыми опциями
DirEntryЭлемент в директории
FileОткрытый файл в файловой системе
FileTypeСтруктура, представляющая тип файла с методами доступа для каждого типа файла. Возвращается методом Metadata::file_type
MetadataМетаданные о файле
OpenOptionsПостроитель для открытия файлов с настраиваемыми опциями
PermissionsПредставление различных разрешений файла
ReadDirПоток элементов в директории

Функции

ФункцияОписание
canonicalizeВозвращает каноническую форму пути
copyКопирует файл в новое место
create_dirСоздает новую пустую директорию по указанному пути
create_dir_allРекурсивно создает директорию и все ее родительские компоненты, если они отсутствуют
hard_linkСоздает жесткую ссылку в файловой системе
metadataЧитает метаданные для пути
readЧитает все содержимое файла как сырые байты
read_dirВозвращает поток элементов в директории
read_linkЧитает символическую ссылку и возвращает путь, на который она указывает
read_to_stringЧитает все содержимое файла как строку
remove_dirУдаляет пустую директорию
remove_dir_allУдаляет директорию и все ее содержимое
remove_fileУдаляет файл
renameПереименовывает файл или директорию в новое место
set_permissionsИзменяет разрешения файла или директории
symlink_metadataЧитает метаданные для пути без перехода по символическим ссылкам
writeЗаписывает срез байтов как новое содержимое файла

Модуль future

Источник: Документация futures-lite::future

Комбинаторы для трейта Future.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::future;

for step in 0..3 {
    println!("step {}", step);

    // Даем другим задачам шанс выполниться.
    future::yield_now().await;
}
}

Структуры

СтруктураОписание
CatchUnwindФьючерс для метода FutureExt::catch_unwind()
OrФьючерс для функции or() и метода FutureExt::or()
PendingСоздает фьючерс, который никогда не разрешается, представляя вычисление, которое никогда не завершается
PollFnФьючерс для функции poll_fn()
PollOnceФьючерс для функции poll_once()
RaceФьючерс для функции race() и метода FutureExt::race()
ReadyФьючерс, который сразу готов со значением
TryZipФьючерс для функции try_zip()
YieldNowФьючерс для функции yield_now()
ZipФьючерс для функции zip()

Трейты

ТрейтОписание
FutureФьючерс представляет асинхронное вычисление, обычно получаемое с помощью async
FutureExtТрейт-расширение для Future

Функции

ФункцияОписание
block_onБлокирует текущий поток на фьючерсе
orВозвращает результат фьючерса, который завершится первым, с предпочтением future1, если оба готовы
pendingСоздает фьючерс, который никогда не разрешается, представляя вычисление, которое никогда не завершается
poll_fnСоздает фьючерс из функции, возвращающей Poll
poll_onceОпрашивает фьючерс всего один раз и возвращает Option с результатом
raceВозвращает результат фьючерса, который завершится первым, без предпочтений, если оба готовы
race_with_seedСоревнует два фьючерса, но с предопределенным случайным сидом
readyСоздает фьючерс, который сразу готов со значением
try_zipОбъединяет два фьючерса с возможностью ошибки, ожидая завершения обоих или ошибки одного из них
yield_nowБудит текущую задачу и возвращает Poll::Pending один раз
zipОбъединяет два фьючерса, ожидая завершения обоих

Псевдонимы типов

ПсевдонимОписание
BoxedПсевдоним типа для Pin<Box<dyn Future<Output = T> + Send + 'static>>
BoxedLocalПсевдоним типа для Pin<Box<dyn Future<Output = T> + 'static>>

Модуль io

Источник: Документация futures-lite::io

Инструменты и комбинаторы для ввода-вывода.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::io::{self, AsyncReadExt};

let input: &[u8] = b"hello";
let mut reader = io::BufReader::new(input);

let mut contents = String::new();
reader.read_to_string(&mut contents).await?;
}

Структуры

СтруктураОписание
AssertAsyncУтверждает, что тип, реализующий трейты std::io, может использоваться как асинхронный тип
AsyncAsSyncОбертка вокруг типа, реализующего AsyncRead или AsyncWrite, которая преобразует опросы Pending в ошибки WouldBlock
BlockOnБлокирует все асинхронные I/O операции и реализует трейты std::io
BufReaderДобавляет буферизацию к читателю
BufWriterДобавляет буферизацию к писателю
BytesЧитатель для метода AsyncReadExt::bytes()
ChainЧитатель для метода AsyncReadExt::chain()
CloseFutureФьючерс для метода AsyncWriteExt::close()
CursorПредоставляет буферу в памяти курсор для чтения и записи
EmptyЧитатель для функции empty()
ErrorТип ошибки для операций I/O трейтов Read, Write, Seek и связанных с ними
FillBufФьючерс для метода AsyncBufReadExt::fill_buf()
FlushFutureФьючерс для метода AsyncWriteExt::flush()
LinesПоток для метода AsyncBufReadExt::lines()
ReadExactFutureФьючерс для метода AsyncReadExt::read_exact()
ReadFutureФьючерс для метода AsyncReadExt::read()
ReadHalfПоловина для чтения, возвращаемая split()
ReadLineFutureФьючерс для метода AsyncBufReadExt::read_line()
ReadToEndFutureФьючерс для метода AsyncReadExt::read_to_end()
ReadToStringFutureФьючерс для метода AsyncReadExt::read_to_string()
ReadUntilFutureФьючерс для метода AsyncBufReadExt::read_until()
ReadVectoredFutureФьючерс для метода AsyncReadExt::read_vectored()
RepeatЧитатель для функции repeat()
SeekFutureФьючерс для метода AsyncSeekExt::seek()
SinkПисатель для функции sink()
SplitПоток для метода AsyncBufReadExt::split()
TakeЧитатель для метода AsyncReadExt::take()
WriteAllFutureФьючерс для метода AsyncWriteExt::write_all()
WriteFutureФьючерс для метода AsyncWriteExt::write()
WriteHalfПоловина для записи, возвращаемая split()
WriteVectoredFutureФьючерс для метода AsyncWriteExt::write_vectored()

Перечисления

ПеречислениеОписание
ErrorKindСписок, определяющий общие категории ошибок I/O
SeekFromПеречисление возможных методов поиска в I/O объекте

Трейты

ТрейтОписание
AsyncBufReadАсинхронное чтение байтов с буферизацией
AsyncBufReadExtТрейт-расширение для AsyncBufRead
AsyncReadАсинхронное чтение байтов
AsyncReadExtТрейт-расширение для AsyncRead
AsyncSeekАсинхронный поиск байтов
AsyncSeekExtТрейт-расширение для AsyncSeek
AsyncWriteАсинхронная запись байтов
AsyncWriteExtТрейт-расширение для AsyncWrite

Функции

ФункцияОписание
copyКопирует все содержимое из читателя в писатель
emptyСоздает пустой читатель
repeatСоздает бесконечного читателя, который повторно читает один и тот же байт
sinkСоздает писатель, который потребляет и отбрасывает все данные
splitРазделяет поток на половины AsyncRead и AsyncWrite

Псевдонимы типов

ПсевдонимОписание
BoxedReaderПсевдоним типа для Pin<Box<dyn AsyncRead + Send + 'static>>
BoxedWriterПсевдоним типа для Pin<Box<dyn AsyncWrite + Send + 'static>>
ResultСпециализированный тип Result для операций I/O

Крейт lock

Источник: Документация async-lock

Асинхронные примитивы синхронизации.

Этот крейт предоставляет следующие примитивы:

  • Barrier - позволяет задачам синхронизироваться всем вместе одновременно
  • Mutex - мьютекс (взаимное исключение)
  • RwLock - блокировка читатель-писатель, позволяющая любое количество читателей или одного писателя
  • Semaphore - ограничивает количество одновременных операций

Связь с std::sync

В общем случае следует рассмотреть использование типов std::sync вместо типов из этого крейта.

Есть два основных случая использования типов из этого крейта:

  1. Вам нужно использовать примитив синхронизации в среде no_std
  2. Вам нужно удерживать блокировку через точку .await (Удержание блокировки std::sync через .await сделает ваш фьючерс не-Send, а также с высокой вероятностью вызовет взаимные блокировки)

Если вы уже используете libstd и не удерживаете блокировки через точки await (существует Clippy lint под названием await_holding_lock, который выдает предупреждения для этого сценария), вам следует рассмотреть std::sync вместо этого крейта. Эти типы оптимизированы для текущей операционной системы, менее сложны и обычно намного быстрее.

В отличие от этого, система уведомлений async-lock использует std::sync::Mutex внутри, если включена функция std, и переходит к значительно более медленной стратегии, если она не включена. Таким образом, существует мало случаев, когда async-lock выигрывает в производительности по сравнению с std::sync.

Модули

МодульОписание
futuresИменованные фьючерсы для использования с примитивами async_lock

Структуры

СтруктураОписание
BarrierСчетчик для синхронизации нескольких задач одновременно
BarrierWaitResultВозвращается Barrier::wait(), когда все задачи вызвали его
MutexАсинхронный мьютекс
MutexGuardОхранник, который освобождает мьютекс при уничтожении
MutexGuardArcВладеющий охранник, который освобождает мьютекс при уничтожении
OnceCellОбласть памяти, в которую можно записать не более одного раза
RwLockАсинхронная блокировка читатель-писатель
RwLockReadGuardОхранник, который освобождает блокировку чтения при уничтожении
RwLockReadGuardArcВладеющий, подсчитывающий ссылки охранник, который освобождает блокировку чтения при уничтожении
RwLockUpgradableReadGuardОхранник, который освобождает улучшаемую блокировку чтения при уничтожении
RwLockUpgradableReadGuardArcВладеющий, подсчитывающий ссылки охранник, который освобождает улучшаемую блокировку чтения при уничтожении
RwLockWriteGuardОхранник, который освобождает блокировку записи при уничтожении
RwLockWriteGuardArcВладеющий, подсчитывающий ссылки охранник, который освобождает блокировку записи при уничтожении
SemaphoreСчетчик для ограничения количества одновременных операций
SemaphoreGuardОхранник, который освобождает полученное разрешение
SemaphoreGuardArcВладеющий охранник, который освобождает полученное разрешение

Крейт net

Источник: Документация async-net

Асинхронные примитивы для сетевого взаимодействия через TCP/UDP/Unix.

Этот крейт представляет собой асинхронную версию std::net и std::os::unix::net.

Реализация

Этот крейт использует async-io для асинхронного I/O и blocking для DNS-запросов.

Примеры

Простой UDP-сервер, который возвращает сообщения обратно отправителю:

#![allow(unused)]
fn main() {
use async_net::UdpSocket;

let socket = UdpSocket::bind("127.0.0.1:8080").await?;
let mut buf = vec![0u8; 1024];

loop {
    let (n, addr) = socket.recv_from(&mut buf).await?;
    socket.send_to(&buf[..n], &addr).await?;
}
}

Модули

МодульОписание
unixUnix-доменные сокеты

Структуры

СтруктураОписание
AddrParseErrorОшибка, которая может быть возвращена при разборе IP-адреса или адреса сокета
IncomingПоток входящих TCP-подключений
Ipv4AddrIPv4-адрес
Ipv6AddrIPv6-адрес
SocketAddrV4Адрес IPv4-сокета
SocketAddrV6Адрес IPv6-сокета
TcpListenerTCP-сервер, прослушивающий подключения
TcpStreamTCP-подключение
UdpSocketUDP-сокет

Перечисления

ПеречислениеОписание
IpAddrIP-адрес, либо IPv4, либо IPv6
ShutdownВозможные значения, которые могут быть переданы в метод TcpStream::shutdown
SocketAddrИнтернет-адрес сокета, либо IPv4, либо IPv6

Трейты

ТрейтОписание
AsyncToSocketAddrsПреобразует или разрешает адреса в значения SocketAddr

Функции

ФункцияОписание
resolveПреобразует или разрешает адреса в значения SocketAddr

Модуль prelude

Источник: Документация futures-lite::prelude

Трейты Future, Stream, AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek и их расширения.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::prelude::*;
}

Трейты

ТрейтОписание
AsyncBufReadАсинхронное чтение байтов с буферизацией
AsyncReadАсинхронное чтение байтов
AsyncSeekАсинхронный поиск байтов
AsyncWriteАсинхронная запись байтов
FutureФьючерс представляет асинхронное вычисление, обычно получаемое с помощью async
StreamПоток значений, производимых асинхронно
FutureExtТрейт-расширение для Future
StreamExtТрейт-расширение для Stream
AsyncBufReadExtТрейт-расширение для AsyncBufRead
AsyncReadExtТрейт-расширение для AsyncRead
AsyncSeekExtТрейт-расширение для AsyncSeek
AsyncWriteExtТрейт-расширение для AsyncWrite

Крейт process

Источник: Документация async-process

Асинхронный интерфейс для работы с процессами.

Этот крейт представляет собой асинхронную версию std::process.

Реализация

Фоновый поток с именем "async-process" лениво создается при первом использовании, который ожидает завершения порожденных дочерних процессов и затем вызывает системный вызов wait() для очистки процессов-«зомби». Это отличается от API процессов в стандартной библиотеке, где удаление работающего Child приводит к утечке его ресурсов.

Этот крейт использует async-io для асинхронного I/O в Unix-подобных системах и blocking для асинхронного I/O в Windows.

Примеры

Запуск процесса и сбор его вывода:

#![allow(unused)]
fn main() {
use async_process::Command;

let out = Command::new("echo").arg("hello").arg("world").output().await?;
assert_eq!(out.stdout, b"hello world\n");
}

Чтение вывода построчно по мере его создания:

#![allow(unused)]
fn main() {
use async_process::{Command, Stdio};
use futures_lite::{io::BufReader, prelude::*};

let mut child = Command::new("find")
    .arg(".")
    .stdout(Stdio::piped())
    .spawn()?;

let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();

while let Some(line) = lines.next().await {
    println!("{}", line?);
}
}

Модули

МодульОписание
unixUnix-специфичные расширения

Структуры

СтруктураОписание
ChildПорождённый дочерний процесс
ChildStderrДескриптор стандартного ошибочного вывода (stderr) дочернего процесса
ChildStdinДескриптор стандартного ввода (stdin) дочернего процесса
ChildStdoutДескриптор стандартного вывода (stdout) дочернего процесса
CommandПостроитель для порождения процессов
ExitStatusОписывает результат процесса после его завершения
OutputВывод завершенного процесса
StdioОписывает, что делать со стандартным I/O потоком для дочернего процесса при передаче в методы stdin, stdout и stderr команды Command

Функции

ФункцияОписание
driverЗапускает драйвер для асинхронных процессов

Модуль stream

Источник: Документация futures-lite::stream

Комбинаторы для трейта Stream.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![1, 2, 3]);

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(3));
assert_eq!(s.next().await, None);
}

Структуры

СтруктураОписание
AllFutureФьючерс для метода StreamExt::all()
AnyFutureФьючерс для метода StreamExt::any()
BlockOnИтератор для функции block_on()
ChainПоток для метода StreamExt::chain()
ClonedПоток для метода StreamExt::cloned()
CollectFutureФьючерс для метода StreamExt::collect()
CopiedПоток для метода StreamExt::copied()
CountFutureФьючерс для метода StreamExt::count()
CycleПоток для метода StreamExt::cycle()
DrainПоток для метода StreamExt::drain()
EmptyПоток для функции empty()
EnumerateПоток для метода StreamExt::enumerate()
FilterПоток для метода StreamExt::filter()
FilterMapПоток для метода StreamExt::filter_map()
FindFutureФьючерс для метода StreamExt::find()
FindMapFutureФьючерс для метода StreamExt::find_map()
FlatMapПоток для метода StreamExt::flat_map()
FlattenПоток для метода StreamExt::flatten()
FoldFutureФьючерс для метода StreamExt::fold()
ForEachFutureФьючерс для метода StreamExt::for_each()
FuseПоток для метода StreamExt::fuse()
InspectПоток для метода StreamExt::inspect()
IterПоток для функции iter()
LastFutureФьючерс для метода StreamExt::last()
MapПоток для метода StreamExt::map()
NextFutureФьючерс для метода StreamExt::next()
NthFutureФьючерс для метода StreamExt::nth()
OnceПоток для функции once()
OnceFutureПоток для функции once_future()
OrПоток для функции or() и метода StreamExt::or()
PartitionFutureФьючерс для метода StreamExt::partition()
PendingПоток для функции pending()
PollFnПоток для функции poll_fn()
PositionFutureФьючерс для метода StreamExt::position()
RaceПоток для функции race() и метода StreamExt::race()
RepeatПоток для функции repeat()
RepeatWithПоток для функции repeat_with()
ScanПоток для метода StreamExt::scan()
SkipПоток для метода StreamExt::skip()
SkipWhileПоток для метода StreamExt::skip_while()
StepByПоток для метода StreamExt::step_by()
TakeПоток для метода StreamExt::take()
TakeWhileПоток для метода StreamExt::take_while()
ThenПоток для метода StreamExt::then()
TryCollectFutureФьючерс для метода StreamExt::try_collect()
TryFoldFutureФьючерс для метода StreamExt::try_fold()
TryForEachFutureФьючерс для метода StreamExt::try_for_each()
TryNextFutureФьючерс для метода StreamExt::try_next()
TryUnfoldПоток для функции try_unfold()
UnfoldПоток для функции unfold()
UnzipFutureФьючерс для метода StreamExt::unzip()
ZipПоток для метода StreamExt::zip()

Трейты

ТрейтОписание
StreamПоток значений, производимых асинхронно
StreamExtТрейт-расширение для Stream

Функции

ФункцияОписание
block_onПреобразует поток в блокирующий итератор
emptyСоздает пустой поток
iterСоздает поток из итератора
onceСоздает поток, который выдает один элемент
once_futureСоздает поток, который вызывает заданный фьючерс как свой первый элемент, а затем больше не производит элементов
orОбъединяет два потока, предпочитая элементы из stream1, когда оба потока готовы
pendingСоздает поток, который всегда находится в состоянии ожидания
poll_fnСоздает поток из функции, возвращающей Poll
raceОбъединяет два потока без предпочтений, когда оба готовы
race_with_seedСоревнует два потока, но с предоставленным пользователем сидом для случайности
repeatСоздает бесконечный поток, который повторно выдает один и тот же элемент
repeat_withСоздает бесконечный поток из замыкания, генерирующего элементы
try_unfoldСоздает поток из начального значения и замыкания с возможностью ошибки, работающего с ним
unfoldСоздает поток из начального значения и асинхронного замыкания, работающего с ним

Псевдонимы типов

ПсевдонимОписание
BoxedПсевдоним типа для Pin<Box<dyn Stream<Item = T> + Send + 'static>>
BoxedLocalПсевдоним типа для Pin<Box<dyn Stream<Item = T> + 'static>>

Макросы

Макрос pin

Источник: Документация futures-lite::pin

#![allow(unused)]
fn main() {
macro_rules! pin {
    ($($x:ident),* $(,)?) => { ... };
}
}

Фиксирует переменную типа T в стеке и перепривязывает её как Pin<&mut T>.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::{future, pin};
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::time::Instant;

// Инспектирует каждый вызов `Future::poll()`.
async fn inspect<T: Debug>(f: impl Future<Output = T>) -> T {
    pin!(f);
    future::poll_fn(|cx| dbg!(f.as_mut().poll(cx))).await
}

let f = async { 1 + 2 };
inspect(f).await;
}

Описание

Макрос pin! используется для безопасного закрепления (pinning) переменных в стеке. Это особенно полезно при работе с асинхронным кодом, где необходимо гарантировать, что данные не будут перемещены в памяти после того, как на них созданы ссылки в асинхронных фьючерсах.

Как работает

  • Принимает один или несколько идентификаторов переменных
  • Для каждой переменной создает закрепленную ссылку Pin<&mut T>
  • Перепривязывает исходную переменную к закрепленной версии

Использование

#![allow(unused)]
fn main() {
use futures_lite::pin;

let mut value = SomeFuture::new();
pin!(value); // Теперь `value` имеет тип `Pin<&mut SomeFuture>`

// Можно использовать закрепленную переменную
let pinned = value.as_mut();
}

Макрос обеспечивает безопасное закрепление, которое необходимо для вызова методов poll на фьючерсах и других типах, требующих гарантий неперемещаемости.

Макрос ready

Источник: Документация futures-lite::ready

#![allow(unused)]
fn main() {
macro_rules! ready {
    ($e:expr $(,)?) => { ... };
}
}

Разворачивает Poll<T> или возвращает Pending.

Примеры

#![allow(unused)]
fn main() {
use futures_lite::{future, prelude::*, ready};
use std::pin::Pin;
use std::task::{Context, Poll};

fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
    let mut fut = future::ready(42);
    let fut = Pin::new(&mut fut);

    let num = ready!(fut.poll(cx));
    // ... используем num

    Poll::Ready(())
}
}

Как работает

Вызов макроса ready! раскрывается в:

#![allow(unused)]
fn main() {
let num = match fut.poll(cx) {
    Poll::Ready(t) => t,
    Poll::Pending => return Poll::Pending,
};
}

Описание

Макрос ready! используется для упрощения работы с типами Poll<T> в асинхронном коде. Он позволяет:

  • Извлечь значение из Poll::Ready(T)
  • Немедленно вернуть Poll::Pending, если результат еще не готов

Это особенно полезно при реализации методов poll вручную, где необходимо последовательно опрашивать несколько фьючерсов или асинхронных операций.

Типичное использование

#![allow(unused)]
fn main() {
use futures_lite::ready;
use std::task::{Context, Poll};

fn poll_multiple_operations(
    cx: &mut Context<'_>,
) -> Poll<Result<(), MyError>> {
    // Опрашиваем первую операцию
    let result1 = ready!(self.op1.poll(cx))?;
    
    // Опрашиваем вторую операцию
    let result2 = ready!(self.op2.poll(cx))?;
    
    Poll::Ready(Ok(()))
}
}

Макрос автоматически обрабатывает распространение ошибок, если тип Poll содержит Result, и обеспечивает корректное возвращение управления при Poll::Pending.

Структуры

Структура Async

Источник: Документация smol::Async

#![allow(unused)]
fn main() {
pub struct Async<T> { /* private fields */ }
}

Асинхронный адаптер для I/O типов.

Этот тип переводит I/O дескриптор в неблокирующий режим, регистрирует его в epoll/kqueue/event ports/IOCP и предоставляет асинхронный интерфейс для работы с ним.

Особенности

Async является низкоуровневым примитивом и имеет некоторые ограничения:

  • Для более высокоуровневых примитивов, построенных на основе Async, используйте async-net или async-process (на Unix)
  • Небезопасно обращаться к внутреннему I/O источнику изменяемо с помощью этого примитива
  • Трейты AsyncRead и AsyncWrite не реализованы по умолчанию, если не гарантировано, что ресурс не будет инвалидирован при чтении или записи

Поддерживаемые типы

Async поддерживает все сетевые типы, а также некоторые OS-специфичные файловые дескрипторы, такие как timerfd и inotify.

Не используйте Async с типами типа File, Stdin, Stdout или Stderr, поскольку все операционные системы имеют проблемы с ними в неблокирующем режиме.

Конкурентный I/O

&Async<T> реализует AsyncRead и AsyncWrite, если &T реализует эти трейты, что означает, что задачи могут конкурентно читать и писать с использованием общих ссылок.

Важно: только одна задача может читать за раз и только одна задача может писать за раз. Допустимо иметь две задачи, где одна читает, а другая пишет одновременно, но недопустимо иметь две задачи, читающие одновременно или пишущие одновременно.

Закрытие

Закрытие стороны записи Async с помощью close() просто выполняет сброс. Если вы хотите завершить работу TCP или Unix сокета, используйте Shutdown.

Примеры

Подключение к серверу и возврат входящих сообщений обратно на сервер:

#![allow(unused)]
fn main() {
use async_io::Async;
use futures_lite::io;
use std::net::TcpStream;

// Подключаемся к локальному серверу.
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;

// Копируем все сообщения из стороны чтения потока в сторону записи.
io::copy(&stream, &stream).await?;
}

Реализации

Источник: Документация smol::Async

Конструкторы

МетодСигнатураОписание
newpub fn new(io: T) -> Result<Async<T>>Создает новый асинхронный I/O дескриптор

Методы доступа

МетодСигнатураОписание
get_refpub fn get_ref(&self) -> &TПолучает ссылку на внутренний дескриптор
get_mutpub fn get_mut(&mut self) -> &mut TПолучает изменяемую ссылку на внутренний дескриптор
into_innerpub fn into_inner(self) -> Result<T>Извлекает внутренний дескриптор

Методы для чтения/записи

МетодСигнатураОписание
read_withpub fn read_with<R>(&self, op: impl FnOnce(&T) -> Result<R>) -> impl Future<Output = Result<R>>Выполняет блокирующую операцию чтения асинхронно
read_with_mutpub fn read_with_mut<R>(&self, op: impl FnOnce(&mut T) -> Result<R>) -> impl Future<Output = Result<R>>Выполняет блокирующую операцию чтения с изменяемым доступом
write_withpub fn write_with<R>(&self, op: impl FnOnce(&T) -> Result<R>) -> impl Future<Output = Result<R>>Выполняет блокирующую операцию записи асинхронно
write_with_mutpub fn write_with_mut<R>(&self, op: impl FnOnce(&mut T) -> Result<R>) -> impl Future<Output = Result<R>>Выполняет блокирующую операцию записи с изменяемым доступом

Методы доступности

МетодСигнатураОписание
readablepub fn readable(&self) -> Readable<'_, T>Создает фьючерс для ожидания доступности для чтения
writablepub fn writable(&self) -> Writable<'_, T>Создает фьючерс для ожидания доступности для записи
poll_readablepub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<()>>Опрашивает доступность для чтения
poll_writablepub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<()>>Опрашивает доступность для записи

Сетевые методы (для TcpStream)

МетодСигнатураОписание
connectpub async fn connect(addr: impl ToSocketAddrs) -> Result<Async<TcpStream>>Асинхронно подключается к адресу
peer_addrpub fn peer_addr(&self) -> Result<SocketAddr>Возвращает адрес удаленного peer
local_addrpub fn local_addr(&self) -> Result<SocketAddr>Возвращает локальный адрес
shutdownpub fn shutdown(&self, how: Shutdown) -> Result<()>Завершает часть соединения

Сетевые методы (для TcpListener)

МетодСигнатураОписание
bindpub fn bind(addr: impl ToSocketAddrs) -> Result<Async<TcpListener>>Привязывает сокет к адресу
acceptpub async fn accept(&self) -> Result<(Async<TcpStream>, SocketAddr)>Асинхронно принимает входящее подключение
local_addrpub fn local_addr(&self) -> Result<SocketAddr>Возвращает локальный адрес

Сетевые методы (для UdpSocket)

МетодСигнатураОписание
bindpub fn bind(addr: impl ToSocketAddrs) -> Result<Async<UdpSocket>>Привязывает UDP сокет к адресу
connectpub async fn connect(&self, addr: impl ToSocketAddrs) -> Result<()>Подключает UDP сокет к адресу
sendpub async fn send(&self, buf: &[u8]) -> Result<usize>Отправляет данные в подключенный адрес
recvpub async fn recv(&self, buf: &mut [u8]) -> Result<usize>Получает данные из подключенного адреса
send_topub async fn send_to(&self, buf: &[u8], addr: impl ToSocketAddrs) -> Result<usize>Отправляет данные на указанный адрес
recv_frompub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)>Получает данные и адрес отправителя
local_addrpub fn local_addr(&self) -> Result<SocketAddr>Возвращает локальный адрес
peer_addrpub fn peer_addr(&self) -> Result<SocketAddr>Возвращает адрес удаленного peer

Трейты

Реализации From

ТрейтОписание
From<Async<T>>Преобразование в внутренний тип

Реализации для трейтов асинхронного I/O

ТрейтМетодыОписание
AsyncReadpoll_readАсинхронное чтение данных
AsyncWritepoll_write, poll_flush, poll_closeАсинхронная запись данных
AsyncBufReadpoll_fill_buf, consumeАсинхронное буферизованное чтение
AsyncSeekpoll_seekАсинхронное перемещение по потоку

Другие трейты

ТрейтОписание
DebugОтладочное представление
CloneКлонирование (если T: Clone)
AsRef<T>Получение ссылки на внутренний тип
AsMut<T>Получение изменяемой ссылки на внутренний тип

Зависимости

Структура Async зависит от следующих трейтов и типов:

  • std::io::Result, std::io::Error
  • std::net::ToSocketAddrs, SocketAddr
  • std::net::{TcpStream, TcpListener, UdpSocket}
  • std::future::Future
  • std::task::{Context, Poll}
  • futures_io::{AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek}
  • std::fmt::Debug
  • std::clone::Clone

Вспомогательные типы

ТипОписание
Readable<'a, T>Фьючерс для ожидания доступности чтения
Writable<'a, T>Фьючерс для ожидания доступности записи

Структура Executor

Источник: Документация async-executor::Executor

#![allow(unused)]
fn main() {
pub struct Executor<'a> { /* private fields */ }
}

Асинхронный исполнитель (executor).

Примеры

Многопоточный исполнитель:

#![allow(unused)]
fn main() {
use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::future;

let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();

Parallel::new()
    // Запускаем четыре потока исполнителя.
    .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
    // Запускаем основной фьючерс в текущем потоке.
    .finish(|| future::block_on(async {
        println!("Hello world!");
        drop(signal);
    }));
}

Методы

new

#![allow(unused)]
fn main() {
pub const fn new() -> Executor<'a>
}

Создает новый исполнитель.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;

let ex = Executor::new();
}

is_empty

#![allow(unused)]
fn main() {
pub fn is_empty(&self) -> bool
}

Возвращает true, если нет незавершенных задач.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;

let ex = Executor::new();
assert!(ex.is_empty());

let task = ex.spawn(async {
    println!("Hello world");
});
assert!(!ex.is_empty());

assert!(ex.try_tick());
assert!(ex.is_empty());
}

spawn

#![allow(unused)]
fn main() {
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T>
where
    T: Send + 'a,
}

Запускает задачу в исполнителе.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;

let ex = Executor::new();

let task = ex.spawn(async {
    println!("Hello world");
});
}

spawn_many

#![allow(unused)]
fn main() {
pub fn spawn_many<T, F>(
    &self,
    futures: impl IntoIterator<Item = F>,
    handles: &mut impl Extend<Task<<F as Future>::Output>>,
)
where
    T: Send + 'a,
    F: Future<Output = T> + Send + 'a,
}

Запускает множество задач в исполнителе.

В отличие от метода spawn, этот метод блокирует внутреннюю блокировку задач исполнителя один раз и запускает все задачи за одну операцию. При большом количестве задач это может улучшить конкуренцию.

Пример:

#![allow(unused)]
fn main() {
use async_executor::Executor;
use futures_lite::{stream, prelude::*};
use std::future::ready;

let mut ex = Executor::new();

let futures = [
    ready(1),
    ready(2),
    ready(3)
];

// Запускаем все фьючерсы в исполнителе одновременно.
let mut tasks = vec![];
ex.spawn_many(futures, &mut tasks);

// Ожидаем завершения всех.
let results = ex.run(async move {
    stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
}).await;
assert_eq!(results, [1, 2, 3]);
}

try_tick

#![allow(unused)]
fn main() {
pub fn try_tick(&self) -> bool
}

Пытается выполнить задачу, если хотя бы одна запланирована.

Выполнение запланированной задачи означает просто однократное опрашивание ее фьючерса.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;

let ex = Executor::new();
assert!(!ex.try_tick()); // нет задач для выполнения

let task = ex.spawn(async {
    println!("Hello world");
});
assert!(ex.try_tick()); // задача найдена и выполнена
}

tick

#![allow(unused)]
fn main() {
pub async fn tick(&self)
}

Выполняет одну задачу.

Выполнение задачи означает просто однократное опрашивание ее фьючерса.

Если при вызове этого метода нет запланированных задач, он будет ждать, пока одна не появится.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;
use futures_lite::future;

let ex = Executor::new();

let task = ex.spawn(async {
    println!("Hello world");
});
future::block_on(ex.tick()); // выполняет задачу
}

run

#![allow(unused)]
fn main() {
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T
}

Запускает исполнитель до завершения указанного фьючерса.

Примеры:

#![allow(unused)]
fn main() {
use async_executor::Executor;
use futures_lite::future;

let ex = Executor::new();

let task = ex.spawn(async { 1 + 2 });
let res = future::block_on(ex.run(async { task.await * 2 }));

assert_eq!(res, 6);
}

Реализации трейтов

Debug

#![allow(unused)]
fn main() {
impl Debug for Executor<'_>
}
  • fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> - Форматирует значение с помощью заданного форматировщика.

Default

#![allow(unused)]
fn main() {
impl<'a> Default for Executor<'a>
}
  • fn default() -> Executor<'a> - Возвращает "значение по умолчанию" для типа.

Drop

#![allow(unused)]
fn main() {
impl Drop for Executor<'_>
}
  • fn drop(&mut self) - Выполняет деструктор для этого типа.

Маркерные трейты

  • RefUnwindSafe - Безопасен для размотки через ссылку
  • Send - Может быть передан между потоками
  • Sync - Может быть разделен между потоками
  • UnwindSafe - Безопасен для размотки

Автоматические реализации трейтов

  • !Freeze - Не замораживается
  • Unpin - Может быть перемещено после закрепления

Стандартные реализации

ТрейтОписание
AnyПроверка типов во время выполнения
Borrow<T>Заимствование как &T
BorrowMut<T>Заимствование как &mut T
From<T>Преобразование из типа T
InstrumentИнструментирование для трассировки
Into<U>Преобразование в тип U
TryFrom<U>Попытка преобразования из типа U
TryInto<U>Попытка преобразования в тип U
WithSubscriberРабота с подписчиками

Основные методы

МетодСигнатураВозвращаемый типОписание
newpub fn new() -> Executor<'a>Executor<'a>Создает новый глобальный исполнитель
spawnpub fn spawn<T>(future: impl Future<Output = T> + Send + 'static) -> Task<T>Task<T>Запускает задачу в глобальном исполнителе
block_onpub fn block_on<T>(future: impl Future<Output = T>) -> TTБлокирует текущий поток до завершения фьючерса

Статические методы

МетодСигнатураВозвращаемый типОписание
globalpub fn global() -> &'static Executor<'static>&'static ExecutorВозвращает ссылку на глобальный исполнитель

Методы экземпляра

МетодСигнатураВозвращаемый типОписание
runpub fn run<T>(&self, future: impl Future<Output = T>) -> TTЗапускает исполнитель до завершения фьючерса
try_tickpub fn try_tick(&self) -> boolboolПытается выполнить одну задачу, если есть готовые
tickpub async fn tick(&self)()Выполняет одну задачу, ожидая если нет готовых

Зависимости типов

Основные типы

ТипИсточникНазначение
Task<T>smol::TaskПредставляет запущенную асинхронную задачу
Futurestd::future::FutureБазовый трейт для асинхронных вычислений
Executor<'a>smol::ExecutorОсновной тип исполнителя

Трейты времени выполнения

ТрейтНазначение
SendГарантирует безопасную передачу между потоками
'staticГарантирует время жизни на всю программу

Реализации трейтов

Стандартные трейты

ТрейтРеализацияНазначение
Debugimpl Debug for Executor<'_>Отладочное представление
Defaultimpl Default for Executor<'_>Создание значения по умолчанию
Dropimpl Drop for Executor<'_>Управление временем жизни

Маркерные трейты

ТрейтНазначение
SendБезопасная передача между потоками
SyncБезопасное разделение между потоками
UnwindSafeБезопасность при размотке стека
RefUnwindSafeБезопасность ссылок при размотке

Автоматические реализации

ТрейтОсобенности
UnpinМожет быть перемещено после закрепления
!FreezeНе может быть заморожено

Особенности использования

Глобальный исполнитель

#![allow(unused)]
fn main() {
// Получение глобального исполнителя
let executor = Executor::global();

// Запуск задачи в глобальном исполнителе
let task = executor.spawn(async {
    // асинхронный код
});
}

Локальное использование

#![allow(unused)]
fn main() {
// Создание локального исполнителя
let executor = Executor::new();

// Блокировка до завершения
executor.block_on(async {
    // асинхронный код
});
}

Ограничения и требования

  • Задачи должны реализовывать Send + 'static
  • Исполнитель предназначен для многопоточного использования
  • block_on блокирует текущий поток
  • spawn не блокирует и возвращает Task сразу

Структура LocalExecutor - Сводная таблица методов

Источник: Документация async-executor::LocalExecutor

  1. Только один поток - нельзя передавать между потоками
  2. Нет Send - задачи не должны быть Send
  3. Локальное время жизни - привязан к области видимости
  4. Блокирующие итераторы - могут заблокировать исполнитель

Сравнение с Executor

ХарактеристикаLocalExecutorExecutor
Многопоточность❌ Только один поток✅ Многопоточный
Требование Send❌ Не требуется✅ Обязательно
Производительность✅ Выше для локальных задач✅ Лучше для CPU-bound
Безопасность✅ Нет гонок данных✅ Синхронизированный

Основные методы

МетодСигнатураВозвращаемый типОписаниеОсобенности
newpub const fn new() -> LocalExecutor<'a>LocalExecutor<'a>Создает однопоточный исполнительТолько для текущего потока
is_emptypub fn is_empty(&self) -> boolboolПроверяет отсутствие незавершенных задачtrue если задач нет
spawnpub fn spawn<T>(future: impl Future<Output = T> + 'a) -> Task<T>Task<T>Запускает задачу в исполнителеНе требует Send
spawn_manypub fn spawn_many<T, F>(futures: impl IntoIterator<Item = F>, handles: &mut impl Extend<Task<F::Output>>)()Массовый запуск задачУлучшает производительность
try_tickpub fn try_tick(&self) -> boolboolПытается выполнить одну задачуНеблокирующий
tickpub async fn tick(&self)()Выполняет одну задачуБлокирует если задач нет
runpub async fn run<T>(future: impl Future<Output = T>) -> TTЗапускает исполнитель до завершения фьючерсаОсновной метод выполнения

Подробное описание методов

Конструкторы

МетодОсобенностиПример использования
newСоздает локальный исполнитель для текущего потокаlet ex = LocalExecutor::new();
defaultЧерез трейт Default (аналогично new)let ex = LocalExecutor::default();

Управление задачами

МетодНазначениеОтличие от Executor
spawnЗапуск одной задачиНе требует Send
spawn_manyПакетный запуск задачБлокировка не снимается (нет других потоков)
is_emptyПроверка состоянияАналогично Executor

Выполнение

МетодТип выполненияПрименение
try_tickНеблокирующийДля фонового выполнения
tickАсинхронныйДля кооперативной многозадачности
runБлокирующийОсновной цикл выполнения

Требования к типам

Для задач

ПараметрТребованиеОбоснование
T'aВремя жизни задачи
FFuture<Output = T> + 'aЛокальный фьючерс
ОтсутствуетSendТолько один поток

Реализации трейтов

Стандартные трейты

ТрейтНазначениеОсобенности
DebugОтладочный выводФорматирование
DefaultЗначение по умолчаниюАналогично new()

Маркерные трейты (отличия от Executor)

ТрейтРеализацияОбоснование
!SendНе реализованТолько для текущего потока
!SyncНе реализованНельзя разделять между потоками
UnwindSafeРеализованБезопасность при панике
RefUnwindSafeРеализованБезопасность ссылок при панике

Автоматические реализации

ТрейтСтатусПримечание
UnpinРеализованМожно перемещать
!FreezeНе замораживается-

Примеры использования

Базовое использование

#![allow(unused)]
fn main() {
use async_executor::LocalExecutor;
use futures_lite::future;

let local_ex = LocalExecutor::new();

future::block_on(local_ex.run(async {
    println!("Hello world!");
}));
}

Запуск задач

#![allow(unused)]
fn main() {
let ex = LocalExecutor::new();

// Одиночная задача
let task = ex.spawn(async { 1 + 2 });

// Массовый запуск
let mut tasks = vec![];
ex.spawn_many([async { 1 }, async { 2 }], &mut tasks);
}

Структура Task

Источник: Документация smol::Task

#![allow(unused)]
fn main() {
pub struct Task<T, M = ()> { /* private fields */ }
}

Запущенная задача.

Описание

Задачу Task можно ожидать (await) для получения результата ее фьючерса.

Удаление (dropping) задачи Task отменяет ее, что означает, что ее фьючерс больше не будет опрашиваться. Чтобы удалить дескриптор задачи без ее отмены, используйте вместо этого метод detach(). Для грациозной отмены задачи и ожидания ее полного уничтожения используйте метод cancel().

Важно: отмена задачи фактически пробуждает ее и перепланирует один последний раз. Затем исполнитель может уничтожить задачу, просто удалив ее Runnable или вызвав run().

Параметры типа

  • T - тип результата, возвращаемого задачей
  • M - тип метаданных (по умолчанию ())

Примеры

#![allow(unused)]
fn main() {
use smol::{future, Executor};
use std::thread;

let ex = Executor::new();

// Запускаем фьючерс в исполнителе.
let task = ex.spawn(async {
    println!("Hello from a task!");
    1 + 2
});

// Запускаем поток исполнителя.
thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));

// Ожидаем результат задачи.
assert_eq!(future::block_on(task), 3);
}
  • Ожидание результата: Задачи можно ожидать через await для получения результата
  • Автоматическая отмена: Удаление задачи приводит к ее отмене
  • Грациозная отмена: Метод cancel() обеспечивает корректное завершение
  • Фоновое выполнение: detach() позволяет задаче работать в фоне

Основные методы

МетодСигнатураВозвращаемый типОписание
detachpub fn detach(self)()Отсоединяет задачу для фонового выполнения
cancelpub async fn cancel(self) -> Option<T>Option<T>Отменяет задачу и ждет завершения
falliblepub fn fallible(self) -> FallibleTask<T, M>FallibleTask<T, M>Преобразует в FallibleTask
is_finishedpub fn is_finished(&self) -> boolboolПроверяет завершена ли задача
metadatapub fn metadata(&self) -> &M&MВозвращает метаданные задачи

Подробное описание методов

Управление выполнением

МетодНазначениеОсобенностиПример использования
detachОтсоединяет задачуЗадача продолжает работать в фонеtask.detach()
cancelГрациозная отменаЖдет завершения, возвращает результатtask.cancel().await
fallibleСоздает fallible версиюОбрабатывает отмену через Nonetask.fallible()

Состояние и метаданные

МетодНазначениеПримечания
is_finishedПроверка состоянияМожет измениться сразу после вызова
metadataДоступ к метаданнымПо умолчанию ()

Примеры использования

Базовое использование

#![allow(unused)]
fn main() {
use smol::{future, Executor};
use std::thread;

let ex = Executor::new();

// Запуск задачи в исполнителе
let task = ex.spawn(async {
    println!("Hello from a task!");
    1 + 2
});

// Ожидание результата
assert_eq!(future::block_on(task), 3);
}

Отсоединение задачи

#![allow(unused)]
fn main() {
use smol::{Executor, Timer};
use std::time::Duration;

let ex = Executor::new();

// Демон-задача
ex.spawn(async {
    loop {
        println!("I'm a daemon task looping forever.");
        Timer::after(Duration::from_secs(1)).await;
    }
})
.detach();
}

Грациозная отмена

#![allow(unused)]
fn main() {
use smol::{future, Executor, Timer};
use std::thread;
use std::time::Duration;

let ex = Executor::new();
let task = ex.spawn(async {
    loop {
        println!("You can still cancel me!");
        Timer::after(Duration::from_secs(1)).await;
    }
});

future::block_on(async {
    Timer::after(Duration::from_secs(3)).await;
    task.cancel().await; // Грациозная отмена
});
}

Реализации трейтов

Основные трейты

ТрейтРеализацияНазначение
Futureimpl<T, M> Future for Task<T, M>Ожидание результата задачи
Debugimpl<T, M> Debug for Task<T, M>Отладочное представление
Dropimpl<T, M> Drop for Task<T, M>Автоматическая отмена при удалении

Маркерные трейты

ТрейтУсловияНазначение
SendT: Send, M: Send + SyncПередача между потоками
SyncM: Send + SyncРазделение между потоками
UnpinВсегдаМожно перемещать после закрепления
UnwindSafeВсегдаБезопасность при панике
RefUnwindSafeВсегдаБезопасность ссылок при панике
FreezeВсегдаМожет быть заморожено

Типы и зависимости

Параметры типа

ПараметрОграниченияНазначение
TЛюбой типРезультат выполнения задачи
MПо умолчанию ()Метаданные задачи

Связанные типы

ТипНазначение
FallibleTask<T, M>Задача с обработкой отмены
Output = TТип результата для Future

Поведение при удалении

ДействиеРезультатАльтернатива
drop(task)Немедленная отменаtask.detach()
task.cancel().awaitГрациозная отмена-
task.detach()Продолжение выполнения-

Особенности многопоточности

  • is_finished() может изменить состояние сразу после вызова
  • Отмена задачи пробуждает ее для последнего выполнения
  • Исполнитель может уничтожить задачу через Runnable

Таблица сравнения методов отмены

МетодТипВозвращаетПоведение
drop()Синхронный-Немедленная отмена
cancel().awaitАсинхронныйOption<T>Грациозная отмена
detach()Синхронный-Без отмены

Трейты Future

Реализация Future

МетодСигнатураНазначение
pollfn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T>Опрос готовности результата
Outputtype Output = TТип выходного значения

Безопасность

  • Send: Только если T: Send и M: Send + Sync
  • Sync: Только если M: Send + Sync
  • UnwindSafe: Всегда безопасно для размотки
  • Panic Safety: Отмена безопасна при панике

Структура Timer

Источник: Документация async-io::Timer

#![allow(unused)]
fn main() {
pub struct Timer { /* private fields */ }
}

Фьючерс или поток, который эмитирует события по времени.

Описание

Таймеры - это фьючерсы, которые выводят единственный Instant при срабатывании.

Таймеры также являются потоками, которые могут периодически выводить Instant.

Точность

Существует ограничение на максимальную точность, которую может обеспечить Timer. Этот предел зависит от текущей платформы; например, в Windows максимальная точность составляет около 16 миллисекунд. Из-за этого ограничения таймер может спать дольше запрошенной длительности. Он никогда не будет спать меньше.

Примеры

Ожидание 1 секунды:

#![allow(unused)]
fn main() {
use async_io::Timer;
use std::time::Duration;

Timer::after(Duration::from_secs(1)).await;
}

Таймаут через 1 секунду:

#![allow(unused)]
fn main() {
use async_io::Timer;
use futures_lite::FutureExt;
use std::time::Duration;

let addrs = async_net::resolve("google.com:80")
    .or(async {
        Timer::after(Duration::from_secs(1)).await;
        Err(std::io::ErrorKind::TimedOut.into())
    })
    .await?;
}

Методы

Создание таймеров

МетодСигнатураОписание
neverpub fn never() -> TimerСоздает таймер, который никогда не сработает
afterpub fn after(duration: Duration) -> TimerСоздает таймер, который сработает один раз через указанную длительность
atpub fn at(instant: Instant) -> TimerСоздает таймер, который сработает один раз в указанный момент времени
intervalpub fn interval(period: Duration) -> TimerСоздает таймер, который срабатывает периодически
interval_atpub fn interval_at(start: Instant, period: Duration) -> TimerСоздает таймер, который срабатывает периодически, начиная с start

Управление таймерами

МетодСигнатураОписание
will_firepub fn will_fire(&self) -> boolПоказывает, сработает ли когда-либо этот таймер
set_afterpub fn set_after(&mut self, duration: Duration)Устанавливает таймер на срабатывание один раз через указанную длительность
set_atpub fn set_at(&mut self, instant: Instant)Устанавливает таймер на срабатывание один раз в указанный момент
set_intervalpub fn set_interval(&mut self, period: Duration)Устанавливает таймер на периодическое срабатывание
set_interval_atpub fn set_interval_at(&mut self, start: Instant, period: Duration)Устанавливает таймер на периодическое срабатывание, начиная с start

Примеры использования

Таймер, который никогда не срабатывает

#![allow(unused)]
fn main() {
use async_io::Timer;
use futures_lite::prelude::*;
use std::time::Duration;

async fn run_with_timeout(timeout: Option<Duration>) {
    let timer = timeout
        .map(|timeout| Timer::after(timeout))
        .unwrap_or_else(Timer::never);

    run_lengthy_operation().or(timer).await;
}

// Таймаут через 5 секунд.
run_with_timeout(Some(Duration::from_secs(5))).await;
// Без таймаута.
run_with_timeout(None).await;
}

Интервальный таймер

#![allow(unused)]
fn main() {
use async_io::Timer;
use futures_lite::StreamExt;
use std::time::{Duration, Instant};

let period = Duration::from_secs(1);
Timer::interval(period).next().await;
}

Проверка срабатывания таймера

#![allow(unused)]
fn main() {
use async_io::Timer;
use futures_lite::prelude::*;
use std::time::Duration;

// `never` никогда не сработает.
assert!(!Timer::never().will_fire());

// `after` сработает, если длительность не слишком большая.
assert!(Timer::after(Duration::from_secs(1)).will_fire());
assert!(!Timer::after(Duration::MAX).will_fire());

// Однако после срабатывания таймер `after` больше никогда не сработает.
let mut t = Timer::after(Duration::from_secs(1));
assert!(t.will_fire());
(&mut t).await;
assert!(!t.will_fire());

// Интервальные таймеры срабатывают периодически.
let mut t = Timer::interval(Duration::from_secs(1));
assert!(t.will_fire());
t.next().await;
assert!(t.will_fire());
}

Реализации трейтов

Future

Таймер реализует Future<Output = Instant>, что позволяет использовать его с await:

#![allow(unused)]
fn main() {
impl Future for Timer {
    type Output = Instant;
    
    fn poll(self: Pin<&mut Timer>, cx: &mut Context<'_>) -> Poll<Instant>;
}
}

Stream

Таймер также реализует Stream<Item = Instant> для периодического использования:

#![allow(unused)]
fn main() {
impl Stream for Timer {
    type Item = Instant;
    
    fn poll_next(self: Pin<&mut Timer>, cx: &mut Context<'_>) -> Poll<Option<Instant>>;
    fn size_hint(&self) -> (usize, Option<usize>);
}
}

Другие трейты

  • Debug - отладочное представление
  • Drop - управление временем жизни
  • Send, Sync - безопасность для многопоточности
  • Unpin - можно перемещать после закрепления

Особенности

  • Переиспользование wake-ов: Методы set_* не удаляют waker связанной задачи
  • Платформенные ограничения: Точность зависит от ОС
  • Гибкость: Может использоваться как фьючерс и как поток

Структура Unblock

Источник: Документация blocking::Unblock

#![allow(unused)]
fn main() {
pub struct Unblock<T> { /* private fields */ }
}

Выполняет блокирующий I/O в пуле потоков.

Описание

Блокирующий I/O должен быть изолирован от асинхронного кода. Этот тип перемещает блокирующие I/O операции в специальный пул потоков, предоставляя при этом знакомый асинхронный интерфейс.

Этот тип реализует трейты Stream, AsyncRead, AsyncWrite или AsyncSeek, если внутренний тип реализует Iterator, Read, Write или Seek соответственно.

Особенности

Unblock является низкоуровневым примитивом и имеет некоторые ограничения:

Для более высокоуровневых примитивов, построенных на основе Unblock, используйте async-fs или async-process (на Windows).

Unblock взаимодействует с I/O операциями в пуле потоков через канал (pipe). Это означает, что асинхронная операция чтения/записи просто получает/отправляет некоторые байты из/в канал. В режиме чтения пул потоков читает байты из I/O дескриптора и пересылает их в канал, пока он не заполнится. В режиме записи пул потоков читает байты из канала и пересылает их в I/O дескриптор.

Используйте Unblock::with_capacity() для настройки емкости канала.

Предупреждения

Чтение

Если вы создадите Unblock<Stdin>, прочитаете из него некоторые байты, а затем удалите его, заблокированная операция чтения может продолжать висеть в пуле потоков. Следующая попытка чтения из stdin потеряет байты, прочитанные висящей операцией. Это сложная проблема для решения, поэтому убедитесь, что вы используете только один дескриптор stdin в течение всей программы.

Запись

Если вы записываете данные через трейт AsyncWrite, убедитесь, что выполнили сброс (flush) перед удалением дескриптора Unblock, иначе некоторые буферизированные данные могут быть потеряны.

Поиск (Seeking)

Из-за буферизации в канале, если Unblock оборачивает File, одна операция чтения может переместить курсор файла дальше, чем диапазон операции. Фактически, чтение просто продолжается в фоновом режиме, пока канал не заполнится. Имейте это в виду при использовании AsyncSeek с относительными смещениями.

Примеры

#![allow(unused)]
fn main() {
use blocking::Unblock;
use futures_lite::prelude::*;

let mut stdout = Unblock::new(std::io::stdout());
stdout.write_all(b"Hello world!").await?;
stdout.flush().await?;
}

Методы

Создание

МетодСигнатураОписание
newpub fn new(io: T) -> Unblock<T>Оборачивает блокирующий I/O дескриптор в асинхронный интерфейс Unblock
with_capacitypub fn with_capacity(cap: usize, io: T) -> Unblock<T>Оборачивает с пользовательской емкостью буфера

Управление

МетодСигнатураОписание
get_mutpub async fn get_mut(&mut self) -> &mut TПолучает изменяемую ссылку на блокирующий I/O дескриптор
with_mutpub async fn with_mut<R, F>(&mut self, op: F) -> RВыполняет блокирующую операцию над I/O дескриптором
into_innerpub async fn into_inner(self) -> TИзвлекает внутренний блокирующий I/O дескриптор

Примеры использования

Базовое использование

#![allow(unused)]
fn main() {
use blocking::Unblock;

let stdin = Unblock::new(std::io::stdin());
}

Настройка емкости буфера

#![allow(unused)]
fn main() {
use blocking::Unblock;

let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
}

Работа с дескриптором

#![allow(unused)]
fn main() {
use blocking::{unblock, Unblock};
use std::fs::File;

let file = unblock(|| File::create("file.txt")).await?;
let mut file = Unblock::new(file);

// Получение ссылки на дескриптор
let metadata = file.get_mut().await.metadata()?;

// Выполнение операции над дескриптором
let metadata = file.with_mut(|f| f.metadata()).await?;

// Извлечение дескриптора
let file = file.into_inner().await;
}

Реализации трейтов

AsyncRead

Реализован для T: Read + Send + 'static:

  • poll_read() - попытка чтения в буфер
  • poll_read_vectored() - векторизованное чтение

AsyncWrite

Реализован для T: Write + Send + 'static:

  • poll_write() - попытка записи из буфера
  • poll_flush() - сброс буферов
  • poll_close() - закрытие объекта
  • poll_write_vectored() - векторизованная запись

AsyncSeek

Реализован для T: Seek + Send + 'static:

  • poll_seek() - перемещение по потоку

Stream

Реализован для T: Iterator + Send + 'static:

  • poll_next() - получение следующего значения
  • size_hint() - информация о размере

Другие трейты

  • Debug - для T: Debug
  • Send/Sync - при соответствующих ограничениях на T
  • Unpin - можно перемещать после закрепления

Емкости по умолчанию

  • Для типов Iterator: 8192 элементов
  • Для типов Read/Write: 8 МБ

Безопасность

  • !RefUnwindSafe - не безопасен для размотки через ссылку
  • !UnwindSafe - не безопасен для размотки
  • Freeze - может быть заморожен

Функции

Функция block_on

Источник: Документация async-io::block_on

#![allow(unused)]
fn main() {
pub fn block_on<T>(future: impl Future<Output = T>) -> T
}

Блокирует текущий поток на фьючерсе, обрабатывая I/O события в простое.

Описание

Функция block_on выполняет асинхронный фьючерс в текущем потоке, блокируя его до завершения фьючерса. Во время выполнения она обрабатывает I/O события, когда поток простаивает.

Эта функция предоставляет простой способ запуска асинхронного кода в синхронном контексте.

Примеры

#![allow(unused)]
fn main() {
use async_io::Timer;
use std::time::Duration;

async_io::block_on(async {
    // Этот таймер, вероятно, будет обработан текущим
    // потоком, а не резервным потоком "async-io".
    Timer::after(Duration::from_millis(1)).await;
});
}

Особенности реализации

  • Обработка I/O: Во время ожидания завершения фьючерса функция обрабатывает события ввода-вывода
  • Эффективность: Использует текущий поток для обработки, что может быть более эффективно, чем использование отдельного исполнителя
  • Универсальность: Может работать с любым фьючерсом, возвращающим значение типа T

Типичные сценарии использования

Запуск асинхронного кода в синхронном контексте

use async_io::block_on;

fn main() {
    let result = block_on(async {
        // Асинхронные операции
        "Результат асинхронной операции"
    });
    
    println!("{}", result);
}

Ожидание таймаутов

#![allow(unused)]
fn main() {
use async_io::{block_on, Timer};
use std::time::Duration;

block_on(async {
    Timer::after(Duration::from_secs(1)).await;
    println!("Прошла 1 секунда");
});
}

Совместное использование с другими асинхронными примитивами

#![allow(unused)]
fn main() {
use async_io::block_on;
use async_net::TcpStream;
use std::io;

block_on(async {
    match TcpStream::connect("example.com:80").await {
        Ok(_) => println!("Подключение успешно"),
        Err(e) => println!("Ошибка подключения: {}", e),
    }
});
}

Отличие от других исполнителей

В отличие от специализированных исполнителей, block_on:

  • Работает в текущем потоке без создания дополнительных потоков
  • Обрабатывает I/O события только когда фьючерс ожидает
  • Подходит для простых сценариев и тестирования

Ограничения

  • Блокирует текущий поток до завершения фьючерса
  • Не подходит для длительных операций в GUI-приложениях
  • Для сложных сценариев предпочтительнее использовать полноценный исполнитель

Параметры

  • future: Фьючерс, который нужно выполнить. Должен реализовывать Future<Output = T>

Возвращаемое значение

  • Возвращает результат выполнения фьючерса типа T

Функция spawn

Источник: Документация smol::spawn

#![allow(unused)]
fn main() {
pub fn spawn<T: Send + 'static>(
    future: impl Future<Output = T> + Send + 'static,
) -> Task<T>
}

Запускает задачу в глобальном исполнителе (по умолчанию однопоточном).

Описание

Существует глобальный исполнитель, который лениво инициализируется при первом использовании. Он включен в эту библиотеку для удобства при написании модульных тестов и небольших программ, но в остальных случаях более целесообразно создавать собственный Executor.

По умолчанию глобальный исполнитель запускается одним фоновым потоком, но вы также можете настроить количество потоков, установив переменную окружения SMOL_THREADS.

Поскольку исполнитель сохраняется навсегда, метод drop не вызывается для задач при выходе из программы.

Примеры

#![allow(unused)]
fn main() {
let task = smol::spawn(async {
    1 + 2
});

smol::block_on(async {
    assert_eq!(task.await, 3);
});
}

Особенности

Глобальный исполнитель

  • Ленивая инициализация: Создается при первом вызове spawn
  • Конфигурация потоков: Настраивается через переменную окружения SMOL_THREADS
  • Постоянное существование: Исполнитель не уничтожается при завершении программы

Требования к задачам

  • T: Send + 'static - результат должен быть передаваемым между потоками и иметь статическое время жизни
  • future: Send + 'static - фьючерс должен быть передаваемым и иметь статическое время жизни

Примеры использования

Базовая задача

#![allow(unused)]
fn main() {
use smol::{spawn, block_on};

let task = spawn(async {
    println!("Задача выполняется");
    42
});

let result = block_on(async {
    task.await
});
assert_eq!(result, 42);
}

Параллельные задачи

#![allow(unused)]
fn main() {
use smol::{spawn, block_on};

let task1 = spawn(async { 1 + 2 });
let task2 = spawn(async { 3 + 4 });

let (result1, result2) = block_on(async {
    (task1.await, task2.await)
});
assert_eq!(result1, 3);
assert_eq!(result2, 7);
}

Долгоиграющие задачи

#![allow(unused)]
fn main() {
use smol::{spawn, Timer};
use std::time::Duration;

let task = spawn(async {
    for i in 0..5 {
        println!("Итерация {}", i);
        Timer::after(Duration::from_secs(1)).await;
    }
    "Завершено"
});

// Задача продолжит выполняться в фоне даже без ожидания
}

Настройка через переменные окружения

# Запуск с 4 рабочими потоками
SMOL_THREADS=4 cargo run

# Запуск с 1 потоком (по умолчанию)
SMOL_THREADS=1 cargo run

Рекомендации по использованию

Когда использовать

  • Тестирование: Удобно для модульных тестов
  • Прототипирование: Быстрое создание прототипов
  • Небольшие программы: Простые приложения и утилиты

Когда не использовать

  • Производственные приложения: Лучше создавать собственный Executor
  • Специфичные требования: При необходимости тонкой настройки
  • Длительные процессы: Из-за особенностей управления временем жизни

Альтернативы

Для более сложных сценариев рекомендуется использовать:

#![allow(unused)]
fn main() {
use smol::Executor;

let executor = Executor::new();
let task = executor.spawn(async {
    // ваша задача
});
}

Безопасность

  • Send + 'static: Гарантирует безопасность между потоками
  • Глобальное состояние: Исполнитель существует всю жизнь программы
  • Автоматическое управление: Не требуется ручное уничтожение задач

Функция unblock

Источник: Документация blocking::unblock

#![allow(unused)]
fn main() {
pub fn unblock<T, F>(f: F) -> Task<T>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,
}

Выполняет блокирующий код в пуле потоков.

Описание

Функция unblock позволяет выполнять блокирующие операции в асинхронном контексте, перемещая их в специальный пул потоков. Это предотвращает блокировку асинхронного исполнителя длительными операциями.

Примеры

Чтение содержимого файла:

#![allow(unused)]
fn main() {
use blocking::unblock;
use std::fs;

let contents = unblock(|| fs::read_to_string("file.txt")).await?;
}

Запуск процесса:

#![allow(unused)]
fn main() {
use blocking::unblock;
use std::process::Command;

let out = unblock(|| Command::new("dir").output()).await?;
}

Особенности

Требования к параметрам

  • F: FnOnce() -> T + Send + 'static - замыкание должно быть выполняемым один раз, передаваемым между потоками и иметь статическое время жизни
  • T: Send + 'static - результат должен быть передаваемым между потоками и иметь статическое время жизни

Пул потоков

  • Выделенные потоки: Использует специальный пул потоков для блокирующих операций
  • Автоматическое управление: Потоки управляются автоматически
  • Изоляция: Предотвращает блокировку асинхронного исполнителя

Примеры использования

Работа с файловой системой

#![allow(unused)]
fn main() {
use blocking::unblock;
use std::fs;

// Чтение файла
let data = unblock(|| fs::read("data.bin")).await?;

// Запись файла
unblock(|| fs::write("output.txt", "Hello world")).await?;

// Создание директории
unblock(|| fs::create_dir_all("path/to/dir")).await?;
}

Вычисления в фоне

#![allow(unused)]
fn main() {
use blocking::unblock;

// Ресурсоемкие вычисления
let result = unblock(|| {
    let mut sum = 0;
    for i in 0..1_000_000 {
        sum += i;
    }
    sum
}).await;

println!("Сумма: {}", result);
}

Сетевые операции

#![allow(unused)]
fn main() {
use blocking::unblock;
use std::net::TcpStream;
use std::io::Write;

let response = unblock(|| {
    let mut stream = TcpStream::connect("example.com:80")?;
    stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")?;
    
    let mut response = String::new();
    std::io::Read::read_to_string(&mut stream, &mut response)?;
    Ok::<String, std::io::Error>(response)
}).await?;
}

Работа с базами данных

#![allow(unused)]
fn main() {
use blocking::unblock;

let users = unblock(|| {
    // Блокирующие операции с БД
    // query_database("SELECT * FROM users")
    vec!["user1", "user2", "user3"] // заглушка
}).await;

for user in users {
    println!("Пользователь: {}", user);
}
}

Совместное использование с другими асинхронными операциями

#![allow(unused)]
fn main() {
use blocking::unblock;
use smol::Timer;
use std::time::Duration;

async fn process_data() -> Result<(), Box<dyn std::error::Error>> {
    // Асинхронное ожидание
    Timer::after(Duration::from_secs(1)).await;
    
    // Блокирующая операция
    let data = unblock(|| std::fs::read_to_string("data.txt")).await?;
    
    // Другая асинхронная операция
    Timer::after(Duration::from_millis(500)).await;
    
    // Еще одна блокирующая операция
    unblock(|| std::fs::write("processed.txt", data.to_uppercase())).await?;
    
    Ok(())
}
}

Обработка ошибок

#![allow(unused)]
fn main() {
use blocking::unblock;

let result = unblock(|| -> Result<String, std::io::Error> {
    std::fs::read_to_string("missing_file.txt")
}).await;

match result {
    Ok(contents) => println!("Содержимое: {}", contents),
    Err(e) => println!("Ошибка: {}", e),
}
}

Преимущества

  • Не блокирует исполнитель: Блокирующие операции выполняются в отдельных потоках
  • Простота использования: Знакомый синтаксис с await
  • Безопасность: Гарантии типов Send + 'static
  • Интеграция: Легко комбинируется с другим асинхронным кодом

Альтернативы

Для длительных операций с состоянием рассмотрите использование Unblock<T>:

#![allow(unused)]
fn main() {
use blocking::Unblock;
use std::fs::File;

let file = Unblock::new(File::open("data.txt")?);
// Множественные операции с файлом через асинхронный интерфейс
}