Модуль stream
Из-за того, что включение трейта Stream в std произошло позже выпуска Tokio 1.0, большинство утилит потоков Tokio были перемещены в крейт tokio-stream.
Почему Stream не был включен в Tokio 1.0?
Изначально мы планировали выпустить Tokio 1.0 со стабильным типом Stream, но, к сожалению, RFC не был вовремя принят, чтобы Stream попал в std на стабильном компиляторе к моменту выпуска Tokio 1.0. По этой причине команда решила переместить все утилиты на основе Stream в крейт tokio-stream. Хотя это не идеально, как только Stream попадет в стандартную библиотеку и период MSRV истечет, мы реализуем stream для наших различных типов.
Хотя это может показаться неудачным, не все потеряно, так как вы можете получить большую часть поддержки Stream с помощью async/await и циклов while let. Также возможно создать impl Stream из async fn с помощью крейта async-stream.
Пример
Преобразование sync::mpsc::Receiver в impl Stream
#![allow(unused)] fn main() { use tokio::sync::mpsc; let (tx, mut rx) = mpsc::channel::<usize>(16); let stream = async_stream::stream! { while let Some(item) = rx.recv().await { yield item; } }; }
Использование tokio-stream для дополнительной функциональности
#![allow(unused)] fn main() { use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tokio::sync::mpsc; let (tx, rx) = mpsc::channel::<usize>(16); // Преобразование Receiver в Stream с помощью tokio-stream let mut stream = ReceiverStream::new(rx); // Использование методов StreamExt while let Some(item) = stream.next().await { println!("получен элемент: {}", item); } }
Создание пользовательского Stream с async-stream
#![allow(unused)] fn main() { use async_stream::stream; use std::time::Duration; use tokio::time::sleep; let mut counter = 0; let stream = stream! { for _ in 0..5 { sleep(Duration::from_secs(1)).await; counter += 1; yield counter; } }; // Использование созданного потока tokio::pin!(stream); while let Some(value) = stream.next().await { println!("значение: {}", value); } }
Рекомендации
Для работы с потоками в Tokio рекомендуется:
- Использовать крейт
tokio-streamдля дополнительных утилит потоков - Использовать
async-streamдля создания пользовательских потоков - Использовать комбинацию
async/awaitи цикловwhile letдля базовой обработки потоков - Следить за обновлениями относительно включения
Streamв стандартную библиотеку
Альтернативы
До стабилизации Stream в std можно использовать следующие подходы:
- Использовать
futures::Streamиз крейтаfutures - Использовать
async-streamдля создания потоков - Использовать циклы
while letс асинхронными итераторами - Использовать каналы Tokio (
mpsc,broadcastи др.) как потоки данных