Потоки (Streams): Фьючерсы в последовательности

Вспомните, как мы использовали получатель (receiver) для нашего асинхронного канала ранее в этой главе в разделе "Передача сообщений". Асинхронный метод recv производит последовательность элементов с течением времени. Это экземпляр гораздо более общего паттерна, известного как поток (stream). Многие концепции естественным образом представляются в виде потоков: элементы, становящиеся доступными в очереди; части данных, постепенно загружаемые из файловой системы, когда полный набор данных слишком велик для памяти компьютера; или данные, поступающие по сети с течением времени. Поскольку потоки являются фьючерсами, мы можем использовать их с любым другим типом фьючерсов и комбинировать их интересными способами. Например, мы можем группировать события в пакеты, чтобы избежать слишком частых сетевых вызовов, устанавливать таймауты для последовательностей длительных операций или регулировать (throttle) события пользовательского интерфейса, чтобы избежать ненужной работы.

Мы видели последовательность элементов в Главе 13, когда рассматривали типаж Iterator в разделе ["Типаж Iterator и метод next"], но между итераторами и получателем асинхронного канала есть два различия. Первое различие — это время: итераторы являются синхронными, а получатель канала — асинхронным. Второе различие — это API. При работе напрямую с Iterator мы вызываем его синхронный метод next. В случае же с потоком trpl::Receiver мы вызывали асинхронный метод recv. В остальном эти API кажутся очень похожими, и это сходство не случайно. Поток похож на асинхронную форму итерации. Однако в то время как trpl::Receiver конкретно ожидает получения сообщений, API потока общего назначения гораздо шире: он предоставляет следующий элемент, как это делает Iterator, но асинхронно.

Сходство между итераторами и потоками в Rust означает, что мы фактически можем создать поток из любого итератора. Как и с итератором, мы можем работать с потоком, вызывая его метод next, а затем ожидая (await) выходное значение, как в Листинге 17-21, который пока не компилируется.

Файл: src/main.rs

#![allow(unused)]
fn main() {
// [Этот код не компилируется!]
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
}

Листинг 17-21: Создание потока из итератора и вывод его значений

Мы начинаем с массива чисел, который преобразуем в итератор, а затем вызываем map, чтобы удвоить все значения. Затем мы преобразуем итератор в поток с помощью функции trpl::stream_from_iter. Далее мы перебираем элементы в потоке по мере их поступления с помощью цикла while let.

К сожалению, при попытке запустить код он не компилируется и вместо этого сообщает, что метод next недоступен:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Как объясняется в этом выводе, причина ошибки компилятора в том, что нам нужен соответствующий типаж в области видимости, чтобы использовать метод next. Учитывая наше обсуждение, вы могли бы разумно предположить, что этим типажом является Stream, но на самом деле это StreamExt. Сокращение от extension (расширение), Ext — это распространенный паттерн в сообществе Rust для расширения одного типажа другим.

Типаж Stream определяет низкоуровневый интерфейс, который фактически объединяет типажи Iterator и Future. StreamExt предоставляет набор API более высокого уровня поверх Stream, включая метод next, а также другие вспомогательные методы, подобные тем, что предоставляются типажом Iterator. Stream и StreamExt пока не являются частью стандартной библиотеки Rust, но большинство крейтов экосистемы используют похожие определения.

Исправление ошибки компилятора заключается в добавлении оператора use для trpl::StreamExt, как в Листинге 17-22.

Файл: src/main.rs

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --snip--

Листинг 17-22: Успешное использование итератора в качестве основы для потока

Когда все эти части собраны вместе, этот код работает так, как мы хотим! Более того, теперь, когда StreamExt находится в области видимости, мы можем использовать все его вспомогательные методы, так же как и с итераторами.