Streams

Поток (stream) - это асинхронная последовательность значений. Это асинхронный эквивалент Rust'ового std::iter::Iterator, представленный трейтом Stream. Потоки можно итерировать в async функциях. Они также могут быть преобразованы с помощью адаптеров. Tokio предоставляет ряд общих адаптеров в трейте StreamExt.

Tokio предоставляет поддержку потоков в отдельном крейте: tokio-stream.

tokio-stream = "0.1"

информация В настоящее время утилиты потоков Tokio находятся в крейте tokio-stream. Когда трейт Stream стабилизируется в стандартной библиотеке Rust, утилиты потоков Tokio будут перемещены в крейт tokio.

Итерация

В настоящее время язык программирования Rust не поддерживает асинхронные циклы for. Вместо этого итерация потоков выполняется с использованием цикла while let в паре с StreamExt::next().

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

Как и итераторы, метод next() возвращает Option<T>, где T - тип значения потока. Получение None указывает на завершение итерации потока.

Mini-Redis broadcast

Давайте рассмотрим немного более сложный пример с использованием клиента Mini-Redis.

Полный код можно найти здесь.

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Публикуем некоторые данные
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

fn dox() {
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}
}

Создается задача для публикации сообщений на сервер Mini-Redis в канал "numbers". Затем в главной задаче мы подписываемся на канал "numbers" и отображаем полученные сообщения.

После подписки вызывается into_stream() на возвращенном подписчике. Это потребляет Subscriber, возвращая поток, который выдает сообщения по мере их поступления. Прежде чем мы начнем итерировать сообщения, обратите внимание, что поток закрепляется в стеке с помощью tokio::pin!. Вызов next() на потоке требует, чтобы поток был закреплен. Функция into_stream() возвращает поток, который не закреплен, мы должны явно закрепить его, чтобы итерировать.

информация Значение Rust "закрепляется", когда оно больше не может быть перемещено в памяти. Ключевое свойство закрепленного значения заключается в том, что можно брать указатели на закрепленные данные, и вызывающая сторона может быть уверена, что указатель останется действительным. Эта функция используется async/await для поддержки заимствования данных между точками .await.

Если мы забудем закрепить поток, мы получим ошибку типа:

error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

Если вы столкнулись с таким сообщением об ошибке, попробуйте закрепить значение!

Прежде чем попытаться запустить это, запустите сервер Mini-Redis:

$ mini-redis-server

Затем попробуйте запустить код. Мы увидим сообщения, выведенные в STDOUT.

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

Некоторые ранние сообщения могут быть потеряны из-за гонки между подпиской и публикацией. Программа никогда не завершается. Подписка на канал Mini-Redis остается активной, пока активен сервер.

Давайте посмотрим, как мы можем работать с потоками, чтобы расширить эту программу.

Адаптеры

Функции, которые принимают Stream и возвращают другой Stream, часто называются 'адаптерами потоков', поскольку они являются формой 'шаблона адаптера'. Общие адаптеры потоков включают map, take и filter.

Давайте обновим Mini-Redis так, чтобы он завершался. После получения трех сообщений остановим итерацию сообщений. Это делается с помощью take. Этот адаптер ограничивает поток, чтобы он выдавал не более n сообщений.

#![allow(unused)]
fn main() {
use mini_redis::client;
use tokio_stream::StreamExt;
async fn subscribe() -> mini_redis::Result<()> {
   let client = client::connect("127.0.0.1:6379").await?;
   let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber
    .into_stream()
    .take(3);
    Ok(())
}
}

Запустив программу снова, мы получим:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

На этот раз программа завершается.

Теперь давайте ограничим поток однозначными числами. Мы проверим это, проверив длину сообщения. Мы используем адаптер filter, чтобы отбросить любое сообщение, которое не соответствует предикату.

#![allow(unused)]
fn main() {
use mini_redis::client;
use tokio_stream::StreamExt;
async fn subscribe() -> mini_redis::Result<()> {
   let client = client::connect("127.0.0.1:6379").await?;
   let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);
    Ok(())
}
}

Запустив программу снова, мы получим:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

Обратите внимание, что порядок применения адаптеров имеет значение. Вызов filter сначала, затем take отличается от вызова take затем filter.

Наконец, мы приведем в порядок вывод, убрав часть Ok(Message { ... }) вывода. Это делается с помощью map. Поскольку это применяется после filter, мы знаем, что сообщение Ok, поэтому можем использовать unwrap().

#![allow(unused)]
fn main() {
use mini_redis::client;
use tokio_stream::StreamExt;
async fn subscribe() -> mini_redis::Result<()> {
   let client = client::connect("127.0.0.1:6379").await?;
   let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);
    Ok(())
}
}

Теперь вывод:

got = b"1"
got = b"3"
got = b"6"

Другой вариант - объединить шаги filter и map в один вызов с помощью filter_map.

Есть больше доступных адаптеров. См. список здесь.

Реализация Stream

Трейт Stream очень похож на трейт Future.

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}
}

Функция Stream::poll_next() очень похожа на Future::poll, за исключением того, что ее можно вызывать повторно для получения многих значений из потока. Как мы видели в Асинхронность в глубине, когда поток не готов вернуть значение, вместо этого возвращается Poll::Pending. Регистрируется waker задачи. Как только поток должен быть опрошен снова, waker уведомляется.

Метод size_hint() используется так же, как и с итераторами.

Обычно при ручной реализации Stream это делается путем комбинирования future и других потоков. В качестве примера давайте построим на основе future Delay, который мы реализовали в Асинхронность в глубине. Мы преобразуем его в поток, который выдает () три раза с интервалом 10 мс.

#![allow(unused)]
fn main() {
use tokio_stream::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::Instant;

struct Interval {
    rem: usize,
    delay: Delay,
}
struct Delay { when: Instant }
impl Future for Delay {
  type Output = ();
  fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
      Poll::Pending
  }  
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // Больше нет задержек
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
}

async-stream

Ручная реализация потоков с использованием трейта Stream может быть утомительной. К сожалению, язык программирования Rust еще не поддерживает синтаксис async/await для определения потоков. Это в работе, но еще не готово.

Крейт async-stream доступен как временное решение. Этот крейт предоставляет макрос stream!, который преобразует ввод в поток. Используя этот крейт, вышеуказанный интервал может быть реализован так:

#![allow(unused)]
fn main() {
use async_stream::stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::StreamExt;
use std::time::{Duration, Instant};

struct Delay { when: Instant }
impl Future for Delay {
  type Output = ();
  fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
      Poll::Pending
  }
}
async fn dox() {
let stream =
stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}
;
tokio::pin!(stream);
while let Some(_) = stream.next().await { }
}
}