Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Модуль vec

Типы параллельных итераторов для векторов (Vec<T>)

Вам редко понадобится взаимодействовать с этим модулем напрямую, если только вам не нужно указать имя одного из типов итераторов.

Структуры

ИмяОписание
AggregateErrorКоллекция ошибок.
ChainПоток, который объединяет несколько потоков один за другим.
IntoConcurrentStreamКонкурентный асинхронный итератор, который перемещается из вектора.
JoinFuture, который ожидает завершения нескольких future.
MergeПоток, который объединяет несколько потоков в один поток.
RaceFuture, который ожидает завершения первого future.
RaceOkFuture, который ожидает завершения первого успешного future.
TryJoinFuture, который ожидает успешного завершения всех future или досрочно прерывается при ошибке.
ZipПоток, который "объединяет" несколько потоков в один поток пар.

Примеры использования

Базовое использование с векторами

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let futures = vec![
    async { 1 },
    async { 2 }, 
    async { 3 },
];

let results = futures.join().await;
assert_eq!(results, vec![1, 2, 3]);
}

Конкурентная обработка элементов вектора

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::time::{sleep, Duration};

let items = vec!["hello", "world", "rust"];

let results: Vec<String> = items
    .into_co_stream()  // Преобразование вектора в конкурентный поток
    .map(|s| async move {
        sleep(Duration::from_millis(10)).await;
        s.to_uppercase()
    })
    .collect()
    .await;

assert_eq!(results, vec!["HELLO", "WORLD", "RUST"]);
}

Обработка ошибок с векторами future

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

let futures = vec![
    async { Ok::<i32, &str>(1) },
    async { Err::<i32, &str>("error occurred") },
    async { Ok::<i32, &str>(3) },
];

let result = futures.try_join().await;
assert!(result.is_err());
}

Гонка future в векторе

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let futures = vec![
    async {
        sleep(Duration::from_millis(100)).await;
        1
    },
    async {
        sleep(Duration::from_millis(50)).await;
        2
    },
    async {
        sleep(Duration::from_millis(10)).await;
        3
    },
];

let winner = futures.race().await;
assert_eq!(winner, 3); // Самый быстрый future
}

Объединение потоков из векторов

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio_stream::{self as stream, StreamExt};

let streams = vec![
    stream::iter(vec![1, 2]),
    stream::iter(vec![3, 4]),
    stream::iter(vec![5, 6]),
];

let mut merged = streams.merge();
let mut results = Vec::new();

while let Some(value) = merged.next().await {
    results.push(value);
}

results.sort();
assert_eq!(results, vec![1, 2, 3, 4, 5, 6]);
}

Практический пример: параллельные HTTP запросы

use futures_concurrency::prelude::*;
use reqwest::Client;

async fn fetch_url(client: &Client, url: &str) -> Result<String, reqwest::Error> {
    let response = client.get(url).send().await?;
    response.text().await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let urls = vec![
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent", 
        "https://httpbin.org/headers",
    ];

    // Создаем вектор future
    let fetch_futures: Vec<_> = urls
        .into_iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    // Параллельное выполнение всех запросов
    let results = fetch_futures.try_join().await?;

    for (i, content) in results.iter().enumerate() {
        println!("Запрос {}: {} байт", i, content.len());
    }

    Ok(())
}

Использование RaceOk с вектором ошибочных операций

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

let operations = vec![
    async {
        sleep(Duration::from_millis(50)).await;
        Err::<i32, &str>("slow error")
    },
    async {
        sleep(Duration::from_millis(10)).await; 
        Ok::<i32, &str>(42)
    },
    async {
        sleep(Duration::from_millis(30)).await;
        Ok::<i32, &str>(100)
    },
];

let result = operations.race_ok().await;
assert_eq!(result, Ok(42)); // Первый успешный результат
}

Особенности работы с векторами

Модуль vec предоставляет специализированные реализации для типа Vec<T>, что обеспечивает:

  • Эффективность: Оптимизированные реализации для динамических коллекций
  • Гибкость: Работа с переменным количеством элементов
  • Удобство: Простое преобразование существующих векторов
  • Интеграция: Легкое взаимодействие с другими частями стандартной библиотеки

Примечание по использованию

Хотя вы можете напрямую использовать типы из этого модуля, в большинстве случаев достаточно импорта futures_concurrency::prelude::* и использования методов напрямую на векторах:

#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;

// Вместо прямого использования vec::Join:
// let results = vec::Join::new(futures).await;

// Просто используйте метод на векторе:
let results = futures.join().await;
}