Модуль stream
Компонуемая асинхронная итерация.
Примеры
Объединение нескольких потоков для обработки значений сразу по готовности
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_lite::stream::{self, StreamExt}; use futures_lite::future::block_on; block_on(async { let a = stream::once(1); let b = stream::once(2); let c = stream::once(3); let s = (a, b, c).merge(); let mut counter = 0; s.for_each(|n| counter += n).await; assert_eq!(counter, 6); }) }
Конкурентность
При работе с несколькими (асинхронными) итераторами порядок, в котором итераторы ожидаются, важен. Как часть асинхронных итераторов, Rust предоставляет встроенные операции для управления порядком выполнения наборов итераторов:
- merge: объединяет несколько итераторов в один итератор, где новый итератор выдает элемент, как только он становится доступен из одного из базовых итераторов.
- zip: объединяет несколько итераторов в итератор пар. Базовые итераторы будут ожидаться конкурентно.
- chain: итерируется по нескольким итераторам последовательно. Следующий итератор в последовательности не начнется, пока предыдущий итератор не завершится.
Future
Future можно рассматривать как асинхронные последовательности из одного элемента. Используя stream::once, future можно преобразовать в асинхронные итераторы и затем использовать с любыми методами конкурентности итераторов. Это позволяет использовать такие операции, как stream::Merge, для конкурентного выполнения наборов future, но получать выходные данные отдельных future, как только они становятся доступны.
См. документацию по конкурентности future для получения дополнительной информации о конкурентности future.
Дополнительные примеры
Использование zip для параллельной обработки потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let names = stream::iter(vec!["Alice", "Bob", "Charlie"]); let ages = stream::iter(vec![25, 30, 35]); let zipped = (names, ages).zip(); let mut results = Vec::new(); while let Some((name, age)) = zipped.next().await { results.push(format!("{} is {} years old", name, age)); } assert_eq!(results, vec![ "Alice is 25 years old", "Bob is 30 years old", "Charlie is 35 years old" ]); }
Использование chain для последовательного соединения потоков
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let first = stream::iter(vec![1, 2]); let second = stream::iter(vec![3, 4]); let third = stream::iter(vec![5, 6]); let chained = (first, second, third).chain(); let results: Vec<_> = chained.collect().await; assert_eq!(results, vec![1, 2, 3, 4, 5, 6]); }
Использование StreamGroup для динамического управления потоками
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use futures_concurrency::stream::StreamGroup; use tokio_stream::{self as stream, StreamExt}; let mut group = StreamGroup::new(); // Добавляем потоки в группу group.push(stream::iter(vec![1, 2])); group.push(stream::iter(vec![3, 4])); // Объединяем все потоки в группе let mut merged = group.merge(); let mut results = Vec::new(); while let Some(value) = merged.next().await { results.push(value); } results.sort(); assert_eq!(results, vec![1, 2, 3, 4]); }
Практический пример с обработкой событий
use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; use tokio::time::{interval, Duration}; use std::pin::Pin; #[tokio::main] async fn main() { // Создаем несколько потоков событий let user_events = stream::iter(vec!["login", "purchase", "logout"]); let system_events = stream::iter(vec!["startup", "shutdown"]); let timer_events = { let mut interval = interval(Duration::from_secs(1)); stream::unfold((), move |_| { let interval = Pin::new(&mut interval); async move { interval.tick().await; Some(("tick", ())) } }) }; // Объединяем все потоки событий let all_events = (user_events, system_events, timer_events.take(2)).merge(); let mut event_count = 0; all_events .for_each(|event| { event_count += 1; println!("Обработано событие: {:?}", event); async {} }) .await; println!("Всего обработано событий: {}", event_count); }
Модули
| Имя | Описание |
|---|---|
stream_group | Расширяемая группа потоков, которые действуют как единое целое. |
Структуры
| Имя | Описание |
|---|---|
StreamGroup | Расширяемая группа потоков, которые действуют как единое целое. |
WaitUntil | Задерживает выполнение потока один раз на указанную длительность. |
Трейты
| Имя | Описание |
|---|---|
Chain | Берет несколько потоков и создает новый поток для всех последовательно. |
IntoStream | Преобразование в Stream. |
Merge | Объединяет несколько потоков в один поток всех их выходных данных. |
StreamExt | Трейт-расширение для трейта Stream. |
Zip | "Объединяет" несколько потоков в один поток пар. |
Ключевые особенности
- Композиционность: Легко комбинировать различные операции с потоками
- Конкурентность: Эффективное управление несколькими потоками данных
- Безопасность типов: Статическая проверка на этапе компиляции
- Гибкость: Поддержка различных структур данных и паттернов использования
- Эффективность: Минимизация накладных расходов при работе с потоками