Модуль vec
Типы параллельных итераторов для векторов (Vec<T>)
Вам редко понадобится взаимодействовать с этим модулем напрямую, если только вам не нужно указать имя одного из типов итераторов.
Структуры
| Имя | Описание |
|---|---|
AggregateError | Коллекция ошибок. |
Chain | Поток, который объединяет несколько потоков один за другим. |
IntoConcurrentStream | Конкурентный асинхронный итератор, который перемещается из вектора. |
Join | Future, который ожидает завершения нескольких future. |
Merge | Поток, который объединяет несколько потоков в один поток. |
Race | Future, который ожидает завершения первого future. |
RaceOk | Future, который ожидает завершения первого успешного future. |
TryJoin | Future, который ожидает успешного завершения всех future или досрочно прерывается при ошибке. |
Zip | Поток, который "объединяет" несколько потоков в один поток пар. |
Примеры использования
Базовое использование с векторами
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let futures = vec![ async { 1 }, async { 2 }, async { 3 }, ]; let results = futures.join().await; assert_eq!(results, vec![1, 2, 3]); }
Конкурентная обработка элементов вектора
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio::time::{sleep, Duration}; let items = vec!["hello", "world", "rust"]; let results: Vec<String> = items .into_co_stream() // Преобразование вектора в конкурентный поток .map(|s| async move { sleep(Duration::from_millis(10)).await; s.to_uppercase() }) .collect() .await; assert_eq!(results, vec!["HELLO", "WORLD", "RUST"]); }
Обработка ошибок с векторами future
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let futures = vec![ async { Ok::<i32, &str>(1) }, async { Err::<i32, &str>("error occurred") }, async { Ok::<i32, &str>(3) }, ]; let result = futures.try_join().await; assert!(result.is_err()); }
Гонка future в векторе
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let futures = vec![ async { sleep(Duration::from_millis(100)).await; 1 }, async { sleep(Duration::from_millis(50)).await; 2 }, async { sleep(Duration::from_millis(10)).await; 3 }, ]; let winner = futures.race().await; assert_eq!(winner, 3); // Самый быстрый future }
Объединение потоков из векторов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use tokio_stream::{self as stream, StreamExt}; let streams = vec![ stream::iter(vec![1, 2]), stream::iter(vec![3, 4]), stream::iter(vec![5, 6]), ]; let mut merged = streams.merge(); let mut results = Vec::new(); while let Some(value) = merged.next().await { results.push(value); } results.sort(); assert_eq!(results, vec![1, 2, 3, 4, 5, 6]); }
Практический пример: параллельные HTTP запросы
use futures_concurrency::prelude::*; use reqwest::Client; async fn fetch_url(client: &Client, url: &str) -> Result<String, reqwest::Error> { let response = client.get(url).send().await?; response.text().await } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::new(); let urls = vec![ "https://httpbin.org/ip", "https://httpbin.org/user-agent", "https://httpbin.org/headers", ]; // Создаем вектор future let fetch_futures: Vec<_> = urls .into_iter() .map(|url| fetch_url(&client, url)) .collect(); // Параллельное выполнение всех запросов let results = fetch_futures.try_join().await?; for (i, content) in results.iter().enumerate() { println!("Запрос {}: {} байт", i, content.len()); } Ok(()) }
Использование RaceOk с вектором ошибочных операций
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::time::Duration; use tokio::time::sleep; let operations = vec![ async { sleep(Duration::from_millis(50)).await; Err::<i32, &str>("slow error") }, async { sleep(Duration::from_millis(10)).await; Ok::<i32, &str>(42) }, async { sleep(Duration::from_millis(30)).await; Ok::<i32, &str>(100) }, ]; let result = operations.race_ok().await; assert_eq!(result, Ok(42)); // Первый успешный результат }
Особенности работы с векторами
Модуль vec предоставляет специализированные реализации для типа Vec<T>, что обеспечивает:
- Эффективность: Оптимизированные реализации для динамических коллекций
- Гибкость: Работа с переменным количеством элементов
- Удобство: Простое преобразование существующих векторов
- Интеграция: Легкое взаимодействие с другими частями стандартной библиотеки
Примечание по использованию
Хотя вы можете напрямую использовать типы из этого модуля, в большинстве случаев достаточно импорта futures_concurrency::prelude::* и использования методов напрямую на векторах:
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; // Вместо прямого использования vec::Join: // let results = vec::Join::new(futures).await; // Просто используйте метод на векторе: let results = futures.join().await; }