Крейт futures_concurrency
Производительные, переносимые, структурированные операции конкурентности для асинхронного Rust. Работает с любой средой выполнения, не стирает времена жизни, всегда обрабатывает отмену и всегда возвращает результат вызывающей стороне.
futures-concurrency предоставляет операции конкурентности как для групп future, так и для stream. Как для ограниченных, так и для неограниченных наборов future и stream. В обоих случаях производительность должна быть на уровне, если не превосходить, традиционных реализаций исполнителей.
Примеры
Ожидание нескольких future разных типов
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use std::future; let a = future::ready(1u8); let b = future::ready("hello"); let c = future::ready(3u16); assert_eq!((a, b, c).join().await, (1, "hello", 3)); }
Конкурентная обработка элементов в коллекции
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let v: Vec<_> = vec!["chashu", "nori"] .into_co_stream() .map(|msg| async move { format!("hello {msg}") }) .collect() .await; assert_eq!(v, &["hello chashu", "hello nori"]); }
Доступ к данным стека вне области видимости future
Адаптировано из std::thread::scope.
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; let mut container = vec![1, 2, 3]; let mut num = 0; let a = async { println!("hello from the first future"); dbg!(&container); }; let b = async { println!("hello from the second future"); num += container[0] + container[2]; }; println!("hello from the main future"); let _ = (a, b).join().await; container.push(4); assert_eq!(num, container.len()); }
Операции
Future
Для future, которые возвращают обычный тип T, доступны только операции join и race. join ожидает завершения всех future, тогда как race ожидает завершения первого future. Однако для future, которые возвращают Try<Output = T>, доступны две дополнительные операции. Следующая таблица описывает поведение операций конкурентности для ошибочных future:
| Ожидание всех результатов | Ожидание первого результата | |
|---|---|---|
| Продолжать при ошибке | Future::join | Future::race_ok |
| Прерывать при ошибке | Future::try_join | Future::race |
Следующие реализации future предоставляются futures-concurrency:
FutureGroup: Расширяемая группа future, работающих как единое целое.tuple:join,try_join,race,race_okarray:join,try_join,race,race_okVec:join,try_join,race,race_ok
Stream
Stream выдают результаты по одному, что означает, что решение о прекращении итерации одинаково для ошибочных и безошибочных потоков. Операции, предоставляемые для потоков, можно классифицировать на основе того, могут ли их входные данные оцениваться конкурентно и могут ли их выходные данные обрабатываться конкурентно.
Конкретно в случае merge, он принимает N потоков и выдает элементы по одному, как только они становятся доступными. Это позволяет конкурентно обрабатывать выходные данные отдельных потоков последующими операциями.
| Последовательная обработка вывода | Конкурентная обработка вывода | |
|---|---|---|
| Последовательная оценка ввода | Stream::chain | пока недоступно ‡ |
| Конкурентная оценка ввода | Stream::zip | Stream::merge |
‡: Это можно было бы решить с помощью гипотетической операции Stream::unzip, однако, поскольку мы стремимся к семантической совместимости с std::iter::Iterator в наших операциях, путь к её добавлению в настоящее время неясен.
Следующие реализации потоков предоставляются futures-concurrency:
StreamGroup: Расширяемая группа потоков, работающих как единое целое.ConcurrentStream: Трейт для асинхронных потоков, которые могут конкурентно обрабатывать элементы.tuple:chain,merge,ziparray:chain,merge,zipVec:chain,merge,zip
Поддержка сред выполнения
futures-concurrency не зависит от наличия какого-либо исполнителя среды выполнения. Это позволяет ему работать "из коробки" с любой асинхронной средой выполнения, включая: tokio, async-std, smol, glommio и monoio. Он также поддерживает среды #[no_std], что позволяет использовать его со встроенными асинхронными средами выполнения, такими как embassy.
Флаги функций
Флаг функции std включен по умолчанию. Для нацеливания на среды alloc или no_std вы можете использовать следующую конфигурацию:
[dependencies]
# no_std
futures-concurrency = { version = "7.5.0", default-features = false }
# alloc
futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }
Дополнительное чтение
futures-concurrency разрабатывался в течение нескольких лет. Основной сопровождающий — Йош Вуйтс (Yosh Wuyts), член рабочей группы по асинхронности Rust (Rust Async WG). Подробнее о разработке и идеях, стоящих за futures-concurrency, можно прочитать здесь:
| Ссылка | Краткое описание темы |
|---|---|
| Tree Structured Concurrency - What is structured concurrency | Введение в структурированную конкурентность и её основные концепции |
| Tree Structured Concurrency - Unstructured concurrency example | Пример неструктурированной конкурентности и её проблемы |
| Tree Structured Concurrency - Structured concurrency example | Пример структурированной конкурентности и её преимущества |
| Tree Structured Concurrency - What's the worst that can happen | Потенциальные проблемы неструктурированной конкурентности |
| Tree Structured Concurrency - Applying to your programs | Практическое применение структурированной конкурентности |
| Tree Structured Concurrency - Managed background tasks | Паттерн управления фоновыми задачами |
| Tree Structured Concurrency - Guaranteeing structure | Гарантии структурированной конкурентности |
| Tree Structured Concurrency - Conclusion | Выводы и заключение по структурированной конкурентности |
| Tree Structured Concurrency - Dijkstra | Ссылки на работы Дейкстры о структурированном программировании |
| Why Async Rust | Обоснование использования асинхронного Rust |
| Async Cancellation | Отмена асинхронных операций в Rust |
| EWD Notes on Structured Programming | Классические заметки Дейкстры о структурированном программировании |
| Graphs and Trees | Графы и деревья в компьютерных науках |
| Tree Structured Concurrency - Academia | Академические источники по структурированной конкурентности |
| Tree Structured Concurrency - Recursion | Рекурсия в контексте структурированной конкурентности |
| futures-concurrency docs | Документация крейта futures-concurrency |
| Tree Structured Concurrency - SQL | Аналогии с SQL в структурированной конкурентности |
| smol::Task docs | Документация по Task в крейте smol |
| smol::Task::cancel | Метод отмены задач в smol |
| Tree Structured Concurrency - Flush | Операция flush в контексте конкурентности |
| async_std::BufWriter docs | Буферизованный писатель в async-std |
| tasky implementation | Пример реализации структурированных задач |
| Scoped Tasks | Задачи с ограниченной областью видимости |
| Linearity and Control | Линейность и управление в программировании |
| async-task-group docs | Документация крейта для группировки асинхронных задач |
| Tree Structured Concurrency - State Machine | Структурированные конечные автоматы |
| Google's Rust Journey | Опыт Google по внедрению Rust в 2022 году |
Модули
| Имя | Описание |
|---|---|
array | Вспомогательные функции и типы для массивов фиксированной длины. |
concurrent_stream | Конкурентное выполнение потоков. |
future | Базовая асинхронная функциональность. |
prelude | Предел конкурентности future. |
stream | Компонуемая асинхронная итерация. |
vec | Типы параллельных итераторов для векторов (Vec<T>). |