Асинхронные итераторы (ранее известные как потоки - streams)
- Stream как асинхронный итератор или как множество фьючерсов
- Работа в процессе (WIP)
- текущий статус
- трейты Stream в futures и Tokio
- ночной (nightly) трейт
- Ленивость, как у синхронных итераторов
- Закрепление (pinning) и потоки (ссылка вперед на главу о закреплении)
- "Плавленые" (fused) потоки
Потребление асинхронного итератора
while letс асинхроннымnextfor_each,for_each_concurrentcollectinto_future,buffered
Комбинаторы потоков
- Принятие фьючерса вместо замыкания
- Некоторые примеры комбинаторов
- Неупорядоченные вариации
StreamGroup
join/select/race с потоками
- Опасности с
selectв цикле - "Плавление" (fusing)
- Отличие от просто фьючерсов
- Альтернативы этим конструкциям
Stream::mergeи т.д.
Реализация асинхронного итератора
- Реализация трейта
- Практические аспекты и вспомогательные функции
- Макрос
async_iterдля потоков
Приемники (Sinks)
- https://docs.rs/futures/latest/futures/sink/index.html
Будущая работа
- Текущий статус
- https://rust-lang.github.io/rfcs/2996-async-iterator.html
async nextvspoll- Синтаксис асинхронной итерации
- (Асинхронные) генераторы
- Заимствующие итераторы (lending iterators)
Асинхронные итераторы (ранее известные как "потоки" или "streams") представляют собой последовательности значений, которые производятся асинхронно. Они являются асинхронным аналогом трейта Iterator из стандартной библиотеки.
Stream как асинхронный итератор или как множество фьючерсов
Асинхронный итератор можно рассматривать двумя способами:
-
Как асинхронный аналог
Iterator:#![allow(unused)] fn main() { trait AsyncIterator { type Item; async fn next(&mut self) -> Option<Self::Item>; } } -
Как последовательность фьючерсов, которые производят значения:
#![allow(unused)] fn main() { // Концептуально - поток это Future<Output = Option<T>> }
Работа в процессе (WIP)
Текущий статус
На момент написания, асинхронные итераторы все еще находятся в стадии активной разработки. Существует несколько конкурирующих подходов и трейтов.
Трейты Stream в futures и Tokio
Трейт futures::Stream (де-факто стандарт):
#![allow(unused)] fn main() { pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; } }
Трейт tokio_stream::Stream (совместим с futures):
#![allow(unused)] fn main() { // В основном такой же, как futures::Stream }
Ночной (nightly) трейт
В ночной версии Rust идет работа над стандартизированным трейтом:
#![allow(unused)] #![feature(async_iterator)] fn main() { use std::async_iter::AsyncIterator; // Пока что экспериментальный API }
Ленивость, как у синхронных итераторов
Как и синхронные итераторы, асинхронные итераторы ленивы - они не производят значения, пока их не запросят:
#![allow(unused)] fn main() { use futures::stream::StreamExt; let mut stream = some_async_generator(); while let Some(item) = stream.next().await { // Обрабатываем каждый элемент по мере поступления process(item).await; } }
Закрепление (pinning) и потоки
Поскольку потоки часто содержат ссылки на самих себя, они требуют закрепления (pinning) для безопасности. Метод poll_next принимает Pin<&mut Self>.
"Плавленые" (fused) потоки
"Плавленый" поток гарантирует, что после возврата None все последующие вызовы next() также вернут None:
#![allow(unused)] fn main() { use futures::stream::StreamExt; let mut stream = some_stream().fuse(); while let Some(item) = stream.next().await { // ... } // Дальнейшие вызовы stream.next().await всегда вернут None }
Потребление асинхронного итератора
while let с асинхронным next
Базовый способ итерации:
#![allow(unused)] fn main() { use futures::stream::StreamExt; let mut stream = tokio_stream::iter(vec![1, 2, 3]); while let Some(value) = stream.next().await { println!("Получено: {}", value); } }
for_each, for_each_concurrent
Последовательная обработка:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; iter(vec![1, 2, 3]) .for_each(|item| async move { println!("Обработка: {}", item); process_item(item).await; }) .await; }
Параллельная обработка:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; iter(vec![1, 2, 3, 4, 5]) .for_each_concurrent(Some(3), |item| async move { // До 3 элементов обрабатываются одновременно println!("Обработка: {}", item); tokio::time::sleep(Duration::from_secs(1)).await; }) .await; }
collect
Сбор всех элементов в коллекцию:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; let numbers: Vec<i32> = iter(vec![1, 2, 3]).collect().await; println!("Собраны: {:?}", numbers); }
into_future, buffered
into_future преобразует поток в фьючерс, который завершается первым элементом:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; let (first, rest) = iter(vec![1, 2, 3]).into_future().await; println!("Первый элемент: {:?}", first); }
buffered выполняет фьючерсы из потока с ограничением параллелизма:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; let stream = iter(vec![ async { 1 }, async { 2 }, async { 3 }, ]); let results: Vec<i32> = stream.buffered(2).collect().await; // Выполняет до 2 фьючерсов одновременно }
Комбинаторы потоков
Принятие фьючерса вместо замыкания
Многие комбинаторы потоков принимают фьючерсы, а не замыкания:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; let stream = iter(vec![1, 2, 3]) .then(|x| async move { x * 2 }) // Принимает фьючерс .map(|x| x + 1) // Принимает синхронное замыкание .collect::<Vec<_>>() .await; }
Некоторые примеры комбинаторов
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; let results = iter(1..=10) .filter(|&x| async move { x % 2 == 0 }) // Фильтрация .map(|x| x * 3) // Преобразование .take(5) // Ограничение количества .collect::<Vec<_>>() .await; }
Неупорядоченные вариации
buffer_unordered выполняет фьючерсы параллельно без сохранения порядка:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; use tokio::time::{sleep, Duration}; let stream = iter(vec![ async { sleep(Duration::from_millis(300)).await; 1 }, async { sleep(Duration::from_millis(100)).await; 2 }, async { sleep(Duration::from_millis(200)).await; 3 }, ]); let results = stream.buffer_unordered(10).collect::<Vec<_>>().await; // Порядок: [2, 3, 1] (по времени завершения) }
StreamGroup
В крейте futures-concurrency:
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures::stream::{StreamExt, iter}; let streams = vec![ iter(vec![1, 2, 3]), iter(vec![4, 5, 6]), ]; let mut group = streams.merge(); // Объединение потоков while let Some(item) = group.next().await { println!("Получено из любого потока: {}", item); } }
join/select/race с потоками
Опасности с select в цикле
Использование select! с потоками в цикле может быть опасным:
#![allow(unused)] fn main() { use tokio_stream::StreamExt; let mut stream_a = some_stream(); let mut stream_b = another_stream(); loop { tokio::select! { Some(item) = stream_a.next() => { process_a(item).await; } Some(item) = stream_b.next() => { process_b(item).await; } else => break, } } }
Проблема: Если поток завершился (вернул None), он все равно будет опрашиваться в следующих итерациях.
"Плавление" (fusing)
Решение - использование "плавленых" потоков:
#![allow(unused)] fn main() { use futures::stream::StreamExt; let mut stream_a = some_stream().fuse(); let mut stream_b = another_stream().fuse(); loop { tokio::select! { item = stream_a.next() => { if let Some(value) = item { process_a(value).await; } else { // stream_a завершен } } item = stream_b.next() => { if let Some(value) = item { process_b(value).await; } else { // stream_b завершен } } // Выход, когда оба потока завершены () = async { if stream_a.is_terminated() && stream_b.is_terminated() } => { break; } } } }
Отличие от просто фьючерсов
Потоки производят множественные значения с течением времени, в то время как фьючерсы производят одно значение.
Альтернативы
Stream::merge объединяет несколько потоков в один:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, select}; let merged = select(stream_a, stream_b); while let Some(item) = merged.next().await { // Элементы из любого потока } }
Реализация асинхронного итератора
Реализация трейта
#![allow(unused)] fn main() { use futures::stream::Stream; use std::pin::Pin; use std::task::{Context, Poll}; struct Counter { current: u32, max: u32, } impl Stream for Counter { type Item = u32; fn poll_next( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { if self.current < self.max { let current = self.current; self.current += 1; Poll::Ready(Some(current)) } else { Poll::Ready(None) } } } }
Практические аспекты и вспомогательные функции
Использование готовых комбинаторов часто проще, чем реализация с нуля:
#![allow(unused)] fn main() { use futures::stream::{StreamExt, iter}; // Вместо реализации Stream, используйте готовые строительные блоки let stream = iter(0..10) .filter(|&x| async move { x % 2 == 0 }) .map(|x| x * 2); }
Макрос async_stream для потоков
Крейт tokio-stream предоставляет макрос для создания потоков:
#![allow(unused)] fn main() { use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tokio::sync::mpsc; let (tx, rx) = mpsc::channel(32); let stream = ReceiverStream::new(rx); // Или с макросом (если доступен) // let stream = async_stream::stream! { // for i in 0..10 { // yield i; // tokio::time::sleep(Duration::from_millis(100)).await; // } // }; }
Приемники (Sinks)
Трейт Sink представляет собой потребителя значений, который может принимать их асинхронно:
#![allow(unused)] fn main() { use futures::sink::Sink; use std::pin::Pin; use std::task::{Context, Poll}; pub trait Sink<Item> { type Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>; fn start_send( self: Pin<&mut Self>, item: Item ) -> Result<(), Self::Error>; fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>; fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>; } }
Sinks используются для каналов, сокетов и других получателей данных.
Будущая работа
Текущий статус
RFC 2996 предлагает стандартизацию асинхронных итераторов:
https://rust-lang.github.io/rfcs/2996-async-iterator.html
async next vs poll
Текущие дебаты сосредоточены на том, должен ли трейт использовать async fn next() или метод poll_next.
Синтаксис асинхронной итерации
Возможный будущий синтаксис:
#![allow(unused)] fn main() { for await item in stream { println!("{}", item); } }
(Асинхронные) генераторы
Генераторы могут упростить создание потоков:
#![allow(unused)] fn main() { async fn* number_generator() -> impl AsyncIterator<Item = u32> { for i in 0.. { yield i; tokio::time::sleep(Duration::from_secs(1)).await; } } }
Заимствующие итераторы (lending iterators)
Также известные как "streaming iterators", они позволяют итераторам возвращать заимствованные данные, что особенно полезно для асинхронных сценариев с нулевым копированием.