Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Модуль concurrent_stream

Конкурентное выполнение потоков.

Примеры

Конкурентная обработка элементов в коллекции

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
    .into_co_stream()
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello nori"]);
}

Конкурентная обработка элементов в потоке

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::stream;

let v: Vec<_> = stream::repeat("chashu")
    .co()
    .take(2)
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello chashu"]);
}

Дополнительные примеры использования

Ограничение количества одновременных операций

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::time::{sleep, Duration};

let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
    .into_co_stream()
    .limit(2) // Ограничиваем до 2 одновременных операций
    .map(|n| async move {
        sleep(Duration::from_millis(100)).await;
        n * 2
    })
    .collect()
    .await;

assert_eq!(results, vec![2, 4, 6, 8, 10]);
}

Использование enumerate для получения индексов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let items = vec!["a", "b", "c"];
let results: Vec<_> = items
    .into_co_stream()
    .enumerate()
    .map(|(index, item)| async move {
        format!("{index}: {item}")
    })
    .collect()
    .await;

assert_eq!(results, vec!["0: a", "1: b", "2: c"]);
}

Селективное взятие элементов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
    .into_co_stream()
    .take(3) // Берем только первые 3 элемента
    .map(|n| async move { n * 2 })
    .collect()
    .await;

assert_eq!(results, vec![2, 4, 6]);
}

Структуры

ИмяОписание
EnumerateКонкурентный итератор, который выдает текущий счетчик и элемент во время итерации.
FromStreamКонкурентная реализация for each из Stream.
LimitКонкурентный итератор, который ограничивает количество применяемого параллелизма.
MapПреобразует элементы из одного типа в другой.
TakeКонкурентный итератор, который итерируется только по первым n итерациям.

Перечисления

ИмяОписание
ConsumerStateСостояние потребителя, используется для обратной связи с источником.

Трейты

ИмяОписание
ConcurrentStreamКонкурентная работа с элементами в потоке.
ConsumerОписывает тип, который может получать данные.
FromConcurrentStreamПреобразование из ConcurrentStream.
IntoConcurrentStreamПреобразование в ConcurrentStream.

Практическое применение

Обработка HTTP запросов с ограничением параллелизма

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use reqwest::Client;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let client = Client::new();
    let response = client.get(url).send().await?;
    response.text().await
}

let urls = vec![
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
];

let results: Vec<Result<String, _>> = urls
    .into_co_stream()
    .limit(2) // Максимум 2 одновременных запроса
    .map(|url| async move {
        fetch_url(url).await
    })
    .collect()
    .await;
}

Параллельная обработка файлов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::fs;

async fn process_file(path: &str) -> Result<usize, std::io::Error> {
    let content = fs::read_to_string(path).await?;
    Ok(content.len())
}

let files = vec!["file1.txt", "file2.txt", "file3.txt"];
let sizes: Vec<Result<usize, _>> = files
    .into_co_stream()
    .map(|file| async move {
        process_file(file).await
    })
    .collect()
    .await;
}

Преимущества конкурентных потоков

  • Эффективное использование ресурсов: Операции выполняются конкурентно, а не последовательно
  • Контроль параллелизма: Возможность ограничивать количество одновременных операций
  • Композиционность: Легко комбинировать с другими операциями потоков
  • Безопасность типов: Статическая проверка типов на этапе компиляции