Потоки (Streams): Фьючерсы в последовательности
Вспомните, как мы использовали получатель (receiver) для нашего асинхронного канала ранее в этой главе в разделе "Передача сообщений". Асинхронный метод recv производит последовательность элементов с течением времени. Это экземпляр гораздо более общего паттерна, известного как поток (stream). Многие концепции естественным образом представляются в виде потоков: элементы, становящиеся доступными в очереди; части данных, постепенно загружаемые из файловой системы, когда полный набор данных слишком велик для памяти компьютера; или данные, поступающие по сети с течением времени. Поскольку потоки являются фьючерсами, мы можем использовать их с любым другим типом фьючерсов и комбинировать их интересными способами. Например, мы можем группировать события в пакеты, чтобы избежать слишком частых сетевых вызовов, устанавливать таймауты для последовательностей длительных операций или регулировать (throttle) события пользовательского интерфейса, чтобы избежать ненужной работы.
Мы видели последовательность элементов в Главе 13, когда рассматривали типаж Iterator в разделе ["Типаж Iterator и метод next"], но между итераторами и получателем асинхронного канала есть два различия. Первое различие — это время: итераторы являются синхронными, а получатель канала — асинхронным. Второе различие — это API. При работе напрямую с Iterator мы вызываем его синхронный метод next. В случае же с потоком trpl::Receiver мы вызывали асинхронный метод recv. В остальном эти API кажутся очень похожими, и это сходство не случайно. Поток похож на асинхронную форму итерации. Однако в то время как trpl::Receiver конкретно ожидает получения сообщений, API потока общего назначения гораздо шире: он предоставляет следующий элемент, как это делает Iterator, но асинхронно.
Сходство между итераторами и потоками в Rust означает, что мы фактически можем создать поток из любого итератора. Как и с итератором, мы можем работать с потоком, вызывая его метод next, а затем ожидая (await) выходное значение, как в Листинге 17-21, который пока не компилируется.
Файл: src/main.rs
#![allow(unused)] fn main() { // [Этот код не компилируется!] let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }
Листинг 17-21: Создание потока из итератора и вывод его значений
Мы начинаем с массива чисел, который преобразуем в итератор, а затем вызываем map, чтобы удвоить все значения. Затем мы преобразуем итератор в поток с помощью функции trpl::stream_from_iter. Далее мы перебираем элементы в потоке по мере их поступления с помощью цикла while let.
К сожалению, при попытке запустить код он не компилируется и вместо этого сообщает, что метод next недоступен:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Как объясняется в этом выводе, причина ошибки компилятора в том, что нам нужен соответствующий типаж в области видимости, чтобы использовать метод next. Учитывая наше обсуждение, вы могли бы разумно предположить, что этим типажом является Stream, но на самом деле это StreamExt. Сокращение от extension (расширение), Ext — это распространенный паттерн в сообществе Rust для расширения одного типажа другим.
Типаж Stream определяет низкоуровневый интерфейс, который фактически объединяет типажи Iterator и Future. StreamExt предоставляет набор API более высокого уровня поверх Stream, включая метод next, а также другие вспомогательные методы, подобные тем, что предоставляются типажом Iterator. Stream и StreamExt пока не являются частью стандартной библиотеки Rust, но большинство крейтов экосистемы используют похожие определения.
Исправление ошибки компилятора заключается в добавлении оператора use для trpl::StreamExt, как в Листинге 17-22.
Файл: src/main.rs
use trpl::StreamExt; fn main() { trpl::block_on(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; // --snip--
Листинг 17-22: Успешное использование итератора в качестве основы для потока
Когда все эти части собраны вместе, этот код работает так, как мы хотим! Более того, теперь, когда StreamExt находится в области видимости, мы можем использовать все его вспомогательные методы, так же как и с итераторами.