Конкурентное выполнение потоков.
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
let v: Vec<_> = vec!["chashu", "nori"]
.into_co_stream()
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;
assert_eq!(v, &["hello chashu", "hello nori"]);
}
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use futures_lite::stream;
let v: Vec<_> = stream::repeat("chashu")
.co()
.take(2)
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;
assert_eq!(v, &["hello chashu", "hello chashu"]);
}
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::time::{sleep, Duration};
let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
.into_co_stream()
.limit(2) // Ограничиваем до 2 одновременных операций
.map(|n| async move {
sleep(Duration::from_millis(100)).await;
n * 2
})
.collect()
.await;
assert_eq!(results, vec![2, 4, 6, 8, 10]);
}
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
let items = vec!["a", "b", "c"];
let results: Vec<_> = items
.into_co_stream()
.enumerate()
.map(|(index, item)| async move {
format!("{index}: {item}")
})
.collect()
.await;
assert_eq!(results, vec!["0: a", "1: b", "2: c"]);
}
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
let items = vec![1, 2, 3, 4, 5];
let results: Vec<_> = items
.into_co_stream()
.take(3) // Берем только первые 3 элемента
.map(|n| async move { n * 2 })
.collect()
.await;
assert_eq!(results, vec![2, 4, 6]);
}
| Имя | Описание |
Enumerate | Конкурентный итератор, который выдает текущий счетчик и элемент во время итерации. |
FromStream | Конкурентная реализация for each из Stream. |
Limit | Конкурентный итератор, который ограничивает количество применяемого параллелизма. |
Map | Преобразует элементы из одного типа в другой. |
Take | Конкурентный итератор, который итерируется только по первым n итерациям. |
| Имя | Описание |
ConsumerState | Состояние потребителя, используется для обратной связи с источником. |
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use reqwest::Client;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let client = Client::new();
let response = client.get(url).send().await?;
response.text().await
}
let urls = vec![
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
];
let results: Vec<Result<String, _>> = urls
.into_co_stream()
.limit(2) // Максимум 2 одновременных запроса
.map(|url| async move {
fetch_url(url).await
})
.collect()
.await;
}
#![allow(unused)]
fn main() {
use futures_concurrency::prelude::*;
use tokio::fs;
async fn process_file(path: &str) -> Result<usize, std::io::Error> {
let content = fs::read_to_string(path).await?;
Ok(content.len())
}
let files = vec!["file1.txt", "file2.txt", "file3.txt"];
let sizes: Vec<Result<usize, _>> = files
.into_co_stream()
.map(|file| async move {
process_file(file).await
})
.collect()
.await;
}
- Эффективное использование ресурсов: Операции выполняются конкурентно, а не последовательно
- Контроль параллелизма: Возможность ограничивать количество одновременных операций
- Композиционность: Легко комбинировать с другими операциями потоков
- Безопасность типов: Статическая проверка типов на этапе компиляции