Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Модуль sync

Доступно только с флагом функции sync.

Примитивы синхронизации для использования в асинхронных контекстах.

Программы Tokio обычно организованы как набор задач, где каждая задача работает независимо и может выполняться на отдельных физических потоках. Примитивы синхронизации, предоставляемые в этом модуле, позволяют этим независимым задачам взаимодействовать друг с другом.

Передача сообщений

Наиболее распространенной формой синхронизации в программе Tokio является передача сообщений. Две задачи работают независимо и отправляют сообщения друг другу для синхронизации. Это имеет преимущество в виде избегания разделяемого состояния.

Передача сообщений реализована с использованием каналов. Канал поддерживает отправку сообщения от одной задачи-производителя к одной или нескольким задачам-потребителям. Tokio предоставляет несколько разновидностей каналов. Каждая разновидность канала поддерживает различные шаблоны передачи сообщений. Когда канал поддерживает несколько производителей, многие отдельные задачи могут отправлять сообщения. Когда канал поддерживает несколько потребителей, многие различные отдельные задачи могут получать сообщения.

Tokio предоставляет множество различных разновидностей каналов, поскольку различные шаблоны передачи сообщений лучше обрабатываются разными реализациями.

Канал oneshot

Канал oneshot поддерживает отправку одного значения от одного производителя к одному потребителю. Этот канал обычно используется для отправки результата вычисления ожидающей стороне.

Пример: использование канала oneshot для получения результата вычисления.

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

async fn some_computation() -> String {
    "представляет результат вычисления".to_string()
}

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    let res = some_computation().await;
    tx.send(res).unwrap();
});

// Делаем другую работу, пока вычисление происходит в фоне

// Ожидаем результат вычисления
let res = rx.await.unwrap();
}

Примечание: если задача производит результат вычисления как свое последнее действие перед завершением, можно использовать JoinHandle для получения этого значения вместо выделения ресурсов для канала oneshot. Ожидание JoinHandle возвращает Result. Если задача паникует, JoinHandle возвращает Err с причиной паники.

Пример:

#![allow(unused)]
fn main() {
async fn some_computation() -> String {
    "результат вычисления".to_string()
}

let join_handle = tokio::spawn(async move {
    some_computation().await
});

// Делаем другую работу, пока вычисление происходит в фоне

// Ожидаем результат вычисления
let res = join_handle.await.unwrap();
}

Канал mpsc

Канал mpsc поддерживает отправку многих значений от многих производителей к одному потребителю. Этот канал часто используется для отправки работы задаче или для получения результата многих вычислений.

Это также канал, который вы должны использовать, если хотите отправлять много сообщений от одного производителя к одному потребителю. Нет специального канала spsc.

Пример: использование mpsc для постепенной потоковой передачи результатов серии вычислений.

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

async fn some_computation(input: u32) -> String {
    format!("результат вычисления {}", input)
}

let (tx, mut rx) = mpsc::channel(100);

tokio::spawn(async move {
    for i in 0..10 {
        let res = some_computation(i).await;
        tx.send(res).await.unwrap();
    }
});

while let Some(res) = rx.recv().await {
    println!("получено = {}", res);
}
}

Аргумент mpsc::channel - это емкость канала. Это максимальное количество значений, которые могут храниться в канале в ожидании получения в любой момент времени. Правильная установка этого значения является ключевой для реализации надежных программ, поскольку емкость канала играет критическую роль в обработке обратного давления.

Распространенным шаблоном параллелизма для управления ресурсами является порождение задачи, посвященной управлению этим ресурсом, и использование передачи сообщений между другими задачами для взаимодействия с ресурсом. Ресурсом может быть что угодно, что не может использоваться конкурентно. Некоторые примеры включают сокет и состояние программы. Например, если нескольким задачам нужно отправлять данные через один сокет, породите задачу для управления сокетом и используйте канал для синхронизации.

Пример: отправка данных из многих задач через один сокет с использованием передачи сообщений.

use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100);

    for _ in 0..10 {
        // Каждой задаче нужен собственный дескриптор `tx`. Это делается путем клонирования
        // исходного дескриптора.
        let tx = tx.clone();

        tokio::spawn(async move {
            tx.send(&b"данные для записи"[..]).await.unwrap();
        });
    }

    // Половина `rx` канала возвращает `None`, когда **все** клоны `tx`
    // удалены. Чтобы гарантировать возврат `None`, удалите дескриптор, принадлежащий
    // текущей задаче. Если этот дескриптор `tx` не удален, всегда будет
    // один оставшийся дескриптор `tx`.
    drop(tx);

    while let Some(res) = rx.recv().await {
        socket.write_all(res).await?;
    }

    Ok(())
}

Каналы mpsc и oneshot можно комбинировать для предоставления шаблона синхронизации типа запрос/ответ с разделяемым ресурсом. Задача порождается для синхронизации ресурса и ожидает команды, полученные по каналу mpsc. Каждая команда включает oneshot Sender, на который отправляется результат команды.

Пример: использование задачи для синхронизации счетчика u64. Каждая задача отправляет команду "получить и увеличить". Значение счетчика до увеличения отправляется через предоставленный канал oneshot.

#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use Command::Increment;

enum Command {
    Increment,
    // Здесь можно добавить другие команды
}

let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);

// Порождение задачи для управления счетчиком
tokio::spawn(async move {
    let mut counter: u64 = 0;

    while let Some((cmd, response)) = cmd_rx.recv().await {
        match cmd {
            Increment => {
                let prev = counter;
                counter += 1;
                response.send(prev).unwrap();
            }
        }
    }
});

let mut join_handles = vec![];

// Порождение задач, которые будут отправлять команду увеличения.
for _ in 0..10 {
    let cmd_tx = cmd_tx.clone();

    join_handles.push(tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();

        cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
        let res = resp_rx.await.unwrap();

        println!("предыдущее значение = {}", res);
    }));
}

// Ожидание завершения всех задач
for join_handle in join_handles.drain(..) {
    join_handle.await.unwrap();
}
}

Канал broadcast

Канал broadcast поддерживает отправку многих значений от многих производителей ко многим потребителям. Каждый потребитель получит каждое значение. Этот канал можно использовать для реализации шаблонов типа "fan out", распространенных в системах pub/sub или "chat".

Этот канал используется реже, чем oneshot и mpsc, но все еще имеет свои случаи использования.

Это также канал, который вы должны использовать, если хотите транслировать значения от одного производителя ко многим потребителям. Нет специального канала spmc broadcast.

Базовое использование:

#![allow(unused)]
fn main() {
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();
}

Канал watch

Канал watch поддерживает отправку многих значений от многих производителей ко многим потребителям. Однако в канале хранится только самое последнее значение. Потребители уведомляются, когда отправляется новое значение, но нет гарантии, что потребители увидят все значения.

Канал watch похож на канал broadcast с емкостью 1.

Случаи использования канала watch включают трансляцию изменений конфигурации или сигнализацию изменений состояния программы, таких как переход к завершению работы.

Пример: использование канала watch для уведомления задач об изменениях конфигурации. В этом примере файл конфигурации проверяется периодически. Когда файл изменяется, изменения конфигурации сигнализируются потребителям.

#![allow(unused)]
fn main() {
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};

use std::io;

#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
    timeout: Duration,
}

impl Config {
    async fn load_from_file() -> io::Result<Config> {
        // логика загрузки файла и десериализации здесь
    }
}

async fn my_async_operation() {
    // Делаем что-то здесь
}

// Загружаем начальное значение конфигурации
let mut config = Config::load_from_file().await.unwrap();

// Создаем канал watch, инициализированный загруженной конфигурацией
let (tx, rx) = watch::channel(config.clone());

// Порождение задачи для мониторинга файла.
tokio::spawn(async move {
    loop {
        // Ждем 10 секунд между проверками
        time::sleep(Duration::from_secs(10)).await;

        // Загружаем файл конфигурации
        let new_config = Config::load_from_file().await.unwrap();

        // Если конфигурация изменилась, отправляем новое значение конфигурации
        // по каналу watch.
        if new_config != config {
            tx.send(new_config.clone()).unwrap();
            config = new_config;
        }
    }
});

let mut handles = vec![];

// Порождение задач, которые выполняют асинхронную операцию не более `timeout`. Если
// время ожидания истекает, перезапускаем операцию.
//
// Задача одновременно отслеживает изменения `Config`. Когда
// длительность таймаута изменяется, таймаут обновляется без перезапуска
// выполняемой операции.
for _ in 0..5 {
    // Клонируем дескриптор watch конфигурации для использования в этой задаче
    let mut rx = rx.clone();

    let handle = tokio::spawn(async move {
        // Начинаем первоначальную операцию и закрепляем future в стеке.
        // Закрепление в стеке требуется для возобновления операции
        // при множественных вызовах `select!`
        let op = my_async_operation();
        tokio::pin!(op);

        // Получаем начальное значение конфигурации
        let mut conf = rx.borrow().clone();

        let mut op_start = Instant::now();
        let sleep = time::sleep_until(op_start + conf.timeout);
        tokio::pin!(sleep);

        loop {
            tokio::select! {
                _ = &mut sleep => {
                    // Операция истекла. Перезапускаем ее
                    op.set(my_async_operation());

                    // Отслеживаем новое время начала
                    op_start = Instant::now();

                    // Перезапускаем таймаут
                    sleep.set(time::sleep_until(op_start + conf.timeout));
                }
                _ = rx.changed() => {
                    conf = rx.borrow_and_update().clone();

                    // Конфигурация была обновлена. Обновляем
                    // `sleep` используя новое значение `timeout`.
                    sleep.as_mut().reset(op_start + conf.timeout);
                }
                _ = &mut op => {
                    // Операция завершена!
                    return
                }
            }
        }
    });

    handles.push(handle);
}

for handle in handles.drain(..) {
    handle.await.unwrap();
}
}

Синхронизация состояния

Оставшиеся примитивы синхронизации сосредоточены на синхронизации состояния. Это асинхронные эквиваленты версий, предоставляемых std. Они работают аналогично своим аналогам из std, но будут ждать асинхронно вместо блокировки потока.

  • Barrier - Гарантирует, что несколько задач будут ждать друг друга для достижения точки в программе, прежде чем продолжить выполнение все вместе.
  • Mutex - Механизм взаимного исключения, который гарантирует, что не более одного потока в данный момент времени может получить доступ к некоторым данным.
  • Notify - Базовое уведомление задачи. Notify поддерживает уведомление принимающей задачи без отправки данных. В этом случае задача просыпается и возобновляет обработку.
  • RwLock - Предоставляет механизм взаимного исключения, который позволяет нескольким читателям одновременно, позволяя только одному писателю за раз. В некоторых случаях это может быть более эффективно, чем мьютекс.
  • Semaphore - Ограничивает количество параллелизма. Семафор содержит некоторое количество разрешений, которые задачи могут запрашивать для входа в критическую секцию. Семафоры полезны для реализации ограничений любого рода.

Совместимость со средой выполнения

Все примитивы синхронизации, предоставляемые в этом модуле, не зависят от среды выполнения. Вы можете свободно перемещать их между различными экземплярами среды выполнения Tokio или даже использовать их из сред выполнения, отличных от Tokio.

При использовании в среде выполнения Tokio примитивы синхронизации участвуют в кооперативном планировании, чтобы избежать голодания. Эта функция не применяется при использовании из сред выполнения, отличных от Tokio.

В качестве исключения, методы, оканчивающиеся на _timeout, не являются независимыми от среды выполнения, поскольку они требуют доступа к таймеру Tokio. См. документацию каждого метода *_timeout для получения дополнительной информации о его использовании.

Модули

ИмяОписание
broadcastМногопроизводительная, многопотребительская широковещательная очередь. Каждое отправленное значение видно всем потребителям.
futuresИменованные типы future.
mpscМногопроизводительная, однопотребительская очередь для отправки значений между асинхронными задачами.
oneshotОдноразовый канал используется для отправки одного сообщения между асинхронными задачами. Функция channel используется для создания пары дескрипторов Sender и Receiver, которые образуют канал.
watchМногопроизводительный, многопотребительский канал, который сохраняет только последнее отправленное значение.

Структуры

ИмяОписание
AcquireErrorОшибка, возвращаемая функцией Semaphore::acquire.
BarrierБарьер позволяет нескольким задачам синхронизировать начало некоторого вычисления.
BarrierWaitResultBarrierWaitResult возвращается wait, когда все задачи в Barrier встретились.
MappedMutexGuardДескриптор удерживаемого Mutex, к которому была применена функция через MutexGuard::map.
MutexАсинхронный тип, подобный Mutex.
MutexGuardДескриптор удерживаемого Mutex. Защита может удерживаться через любую точку .await, так как она Send.
NotifyУведомляет одну задачу о пробуждении.
OnceCellПотокобезопасная ячейка, в которую можно записать только один раз.
OwnedMappedMutexGuardВладеемый дескриптор удерживаемого Mutex, к которому была применена функция через OwnedMutexGuard::map.
OwnedMutexGuardВладеемый дескриптор удерживаемого Mutex.
OwnedRwLockMappedWriteGuardВладеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
OwnedRwLockReadGuardВладеемая структура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении.
OwnedRwLockWriteGuardВладеемая структура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
OwnedSemaphorePermitВладеемое разрешение от семафора.
RwLockАсинхронная блокировка читатель-писатель.
RwLockMappedWriteGuardСтруктура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
RwLockReadGuardСтруктура RAII, используемая для освобождения общего доступа на чтение блокировки при удалении.
RwLockWriteGuardСтруктура RAII, используемая для освобождения эксклюзивного доступа на запись блокировки при удалении.
SemaphoreСчитающий семафор, выполняющий асинхронное получение разрешений.
SemaphorePermitРазрешение от семафора.
SetOnceПотокобезопасная ячейка, в которую можно записать только один раз.
SetOnceErrorОшибка, которая может быть возвращена из SetOnce::set.
TryLockErrorОшибка, возвращаемая функциями Mutex::try_lock, RwLock::try_read и RwLock::try_write.

Перечисления

ИмяОписание
SetErrorОшибки, которые могут быть возвращены из OnceCell::set.
TryAcquireErrorОшибка, возвращаемая функцией Semaphore::try_acquire.