Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Модуль stream

Из-за того, что включение трейта Stream в std произошло позже выпуска Tokio 1.0, большинство утилит потоков Tokio были перемещены в крейт tokio-stream.

Почему Stream не был включен в Tokio 1.0?

Изначально мы планировали выпустить Tokio 1.0 со стабильным типом Stream, но, к сожалению, RFC не был вовремя принят, чтобы Stream попал в std на стабильном компиляторе к моменту выпуска Tokio 1.0. По этой причине команда решила переместить все утилиты на основе Stream в крейт tokio-stream. Хотя это не идеально, как только Stream попадет в стандартную библиотеку и период MSRV истечет, мы реализуем stream для наших различных типов.

Хотя это может показаться неудачным, не все потеряно, так как вы можете получить большую часть поддержки Stream с помощью async/await и циклов while let. Также возможно создать impl Stream из async fn с помощью крейта async-stream.

Пример

Преобразование sync::mpsc::Receiver в impl Stream

#![allow(unused)]
fn main() {
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::<usize>(16);

let stream = async_stream::stream! {
    while let Some(item) = rx.recv().await {
        yield item;
    }
};
}

Использование tokio-stream для дополнительной функциональности

#![allow(unused)]
fn main() {
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel::<usize>(16);

// Преобразование Receiver в Stream с помощью tokio-stream
let mut stream = ReceiverStream::new(rx);

// Использование методов StreamExt
while let Some(item) = stream.next().await {
    println!("получен элемент: {}", item);
}
}

Создание пользовательского Stream с async-stream

#![allow(unused)]
fn main() {
use async_stream::stream;
use std::time::Duration;
use tokio::time::sleep;

let mut counter = 0;
let stream = stream! {
    for _ in 0..5 {
        sleep(Duration::from_secs(1)).await;
        counter += 1;
        yield counter;
    }
};

// Использование созданного потока
tokio::pin!(stream);
while let Some(value) = stream.next().await {
    println!("значение: {}", value);
}
}

Рекомендации

Для работы с потоками в Tokio рекомендуется:

  1. Использовать крейт tokio-stream для дополнительных утилит потоков
  2. Использовать async-stream для создания пользовательских потоков
  3. Использовать комбинацию async/await и циклов while let для базовой обработки потоков
  4. Следить за обновлениями относительно включения Stream в стандартную библиотеку

Альтернативы

До стабилизации Stream в std можно использовать следующие подходы:

  • Использовать futures::Stream из крейта futures
  • Использовать async-stream для создания потоков
  • Использовать циклы while let с асинхронными итераторами
  • Использовать каналы Tokio (mpsc, broadcast и др.) как потоки данных