Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Модуль stream

Компонуемая асинхронная итерация.

Примеры

Объединение нескольких потоков для обработки значений сразу по готовности

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::stream::{self, StreamExt};
use futures_lite::future::block_on;

block_on(async {
    let a = stream::once(1);
    let b = stream::once(2);
    let c = stream::once(3);
    let s = (a, b, c).merge();

    let mut counter = 0;
    s.for_each(|n| counter += n).await;
    assert_eq!(counter, 6);
})
}

Конкурентность

При работе с несколькими (асинхронными) итераторами порядок, в котором итераторы ожидаются, важен. Как часть асинхронных итераторов, Rust предоставляет встроенные операции для управления порядком выполнения наборов итераторов:

  • merge: объединяет несколько итераторов в один итератор, где новый итератор выдает элемент, как только он становится доступен из одного из базовых итераторов.
  • zip: объединяет несколько итераторов в итератор пар. Базовые итераторы будут ожидаться конкурентно.
  • chain: итерируется по нескольким итераторам последовательно. Следующий итератор в последовательности не начнется, пока предыдущий итератор не завершится.

Future

Future можно рассматривать как асинхронные последовательности из одного элемента. Используя stream::once, future можно преобразовать в асинхронные итераторы и затем использовать с любыми методами конкурентности итераторов. Это позволяет использовать такие операции, как stream::Merge, для конкурентного выполнения наборов future, но получать выходные данные отдельных future, как только они становятся доступны.

См. документацию по конкурентности future для получения дополнительной информации о конкурентности future.

Дополнительные примеры

Использование zip для параллельной обработки потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let names = stream::iter(vec!["Alice", "Bob", "Charlie"]);
let ages = stream::iter(vec![25, 30, 35]);

let zipped = (names, ages).zip();
let mut results = Vec::new();

while let Some((name, age)) = zipped.next().await {
    results.push(format!("{} is {} years old", name, age));
}

assert_eq!(results, vec![
    "Alice is 25 years old",
    "Bob is 30 years old", 
    "Charlie is 35 years old"
]);
}

Использование chain для последовательного соединения потоков

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let first = stream::iter(vec![1, 2]);
let second = stream::iter(vec![3, 4]);
let third = stream::iter(vec![5, 6]);

let chained = (first, second, third).chain();
let results: Vec<_> = chained.collect().await;

assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
}

Использование StreamGroup для динамического управления потоками

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_concurrency::stream::StreamGroup;
use tokio_stream::{self as stream, StreamExt};

let mut group = StreamGroup::new();

// Добавляем потоки в группу
group.push(stream::iter(vec![1, 2]));
group.push(stream::iter(vec![3, 4]));

// Объединяем все потоки в группе
let mut merged = group.merge();
let mut results = Vec::new();

while let Some(value) = merged.next().await {
    results.push(value);
}

results.sort();
assert_eq!(results, vec![1, 2, 3, 4]);
}

Практический пример с обработкой событий

use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};
use tokio::time::{interval, Duration};
use std::pin::Pin;

#[tokio::main]
async fn main() {
    // Создаем несколько потоков событий
    let user_events = stream::iter(vec!["login", "purchase", "logout"]);
    let system_events = stream::iter(vec!["startup", "shutdown"]);
    let timer_events = {
        let mut interval = interval(Duration::from_secs(1));
        stream::unfold((), move |_| {
            let interval = Pin::new(&mut interval);
            async move {
                interval.tick().await;
                Some(("tick", ()))
            }
        })
    };

    // Объединяем все потоки событий
    let all_events = (user_events, system_events, timer_events.take(2)).merge();
    
    let mut event_count = 0;
    all_events
        .for_each(|event| {
            event_count += 1;
            println!("Обработано событие: {:?}", event);
            async {}
        })
        .await;
        
    println!("Всего обработано событий: {}", event_count);
}

Модули

ИмяОписание
stream_groupРасширяемая группа потоков, которые действуют как единое целое.

Структуры

ИмяОписание
StreamGroupРасширяемая группа потоков, которые действуют как единое целое.
WaitUntilЗадерживает выполнение потока один раз на указанную длительность.

Трейты

ИмяОписание
ChainБерет несколько потоков и создает новый поток для всех последовательно.
IntoStreamПреобразование в Stream.
MergeОбъединяет несколько потоков в один поток всех их выходных данных.
StreamExtТрейт-расширение для трейта Stream.
Zip"Объединяет" несколько потоков в один поток пар.

Ключевые особенности

  • Композиционность: Легко комбинировать различные операции с потоками
  • Конкурентность: Эффективное управление несколькими потоками данных
  • Безопасность типов: Статическая проверка на этапе компиляции
  • Гибкость: Поддержка различных структур данных и паттернов использования
  • Эффективность: Минимизация накладных расходов при работе с потоками