Streams
Поток (stream) - это асинхронная последовательность значений. Это асинхронный эквивалент Rust'ового std::iter::Iterator, представленный трейтом Stream. Потоки можно итерировать в async функциях. Они также могут быть преобразованы с помощью адаптеров. Tokio предоставляет ряд общих адаптеров в трейте StreamExt.
Tokio предоставляет поддержку потоков в отдельном крейте: tokio-stream.
tokio-stream = "0.1"
информация В настоящее время утилиты потоков Tokio находятся в крейте
tokio-stream. Когда трейтStreamстабилизируется в стандартной библиотеке Rust, утилиты потоков Tokio будут перемещены в крейтtokio.
Итерация
В настоящее время язык программирования Rust не поддерживает асинхронные циклы for. Вместо этого итерация потоков выполняется с использованием цикла while let в паре с StreamExt::next().
use tokio_stream::StreamExt; #[tokio::main] async fn main() { let mut stream = tokio_stream::iter(&[1, 2, 3]); while let Some(v) = stream.next().await { println!("GOT = {:?}", v); } }
Как и итераторы, метод next() возвращает Option<T>, где T - тип значения потока. Получение None указывает на завершение итерации потока.
Mini-Redis broadcast
Давайте рассмотрим немного более сложный пример с использованием клиента Mini-Redis.
Полный код можно найти здесь.
use tokio_stream::StreamExt; use mini_redis::client; async fn publish() -> mini_redis::Result<()> { let mut client = client::connect("127.0.0.1:6379").await?; // Публикуем некоторые данные client.publish("numbers", "1".into()).await?; client.publish("numbers", "two".into()).await?; client.publish("numbers", "3".into()).await?; client.publish("numbers", "four".into()).await?; client.publish("numbers", "five".into()).await?; client.publish("numbers", "6".into()).await?; Ok(()) } async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber.into_stream(); tokio::pin!(messages); while let Some(msg) = messages.next().await { println!("got = {:?}", msg); } Ok(()) } fn dox() { #[tokio::main] async fn main() -> mini_redis::Result<()> { tokio::spawn(async { publish().await }); subscribe().await?; println!("DONE"); Ok(()) } }
Создается задача для публикации сообщений на сервер Mini-Redis в канал "numbers". Затем в главной задаче мы подписываемся на канал "numbers" и отображаем полученные сообщения.
После подписки вызывается into_stream() на возвращенном подписчике. Это потребляет Subscriber, возвращая поток, который выдает сообщения по мере их поступления. Прежде чем мы начнем итерировать сообщения, обратите внимание, что поток закрепляется в стеке с помощью tokio::pin!. Вызов next() на потоке требует, чтобы поток был закреплен. Функция into_stream() возвращает поток, который не закреплен, мы должны явно закрепить его, чтобы итерировать.
информация Значение Rust "закрепляется", когда оно больше не может быть перемещено в памяти. Ключевое свойство закрепленного значения заключается в том, что можно брать указатели на закрепленные данные, и вызывающая сторона может быть уверена, что указатель останется действительным. Эта функция используется
async/awaitдля поддержки заимствования данных между точками.await.
Если мы забудем закрепить поток, мы получим ошибку типа:
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
Если вы столкнулись с таким сообщением об ошибке, попробуйте закрепить значение!
Прежде чем попытаться запустить это, запустите сервер Mini-Redis:
$ mini-redis-server
Затем попробуйте запустить код. Мы увидим сообщения, выведенные в STDOUT.
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
Некоторые ранние сообщения могут быть потеряны из-за гонки между подпиской и публикацией. Программа никогда не завершается. Подписка на канал Mini-Redis остается активной, пока активен сервер.
Давайте посмотрим, как мы можем работать с потоками, чтобы расширить эту программу.
Адаптеры
Функции, которые принимают Stream и возвращают другой Stream, часто называются 'адаптерами потоков', поскольку они являются формой 'шаблона адаптера'. Общие адаптеры потоков включают map, take и filter.
Давайте обновим Mini-Redis так, чтобы он завершался. После получения трех сообщений остановим итерацию сообщений. Это делается с помощью take. Этот адаптер ограничивает поток, чтобы он выдавал не более n сообщений.
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .take(3); Ok(()) } }
Запустив программу снова, мы получим:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
На этот раз программа завершается.
Теперь давайте ограничим поток однозначными числами. Мы проверим это, проверив длину сообщения. Мы используем адаптер filter, чтобы отбросить любое сообщение, которое не соответствует предикату.
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .take(3); Ok(()) } }
Запустив программу снова, мы получим:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
Обратите внимание, что порядок применения адаптеров имеет значение. Вызов filter сначала, затем take отличается от вызова take затем filter.
Наконец, мы приведем в порядок вывод, убрав часть Ok(Message { ... }) вывода. Это делается с помощью map. Поскольку это применяется после filter, мы знаем, что сообщение Ok, поэтому можем использовать unwrap().
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .map(|msg| msg.unwrap().content) .take(3); Ok(()) } }
Теперь вывод:
got = b"1"
got = b"3"
got = b"6"
Другой вариант - объединить шаги filter и map в один вызов с помощью filter_map.
Есть больше доступных адаптеров. См. список здесь.
Реализация Stream
Трейт Stream очень похож на трейт Future.
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } } }
Функция Stream::poll_next() очень похожа на Future::poll, за исключением того, что ее можно вызывать повторно для получения многих значений из потока. Как мы видели в Асинхронность в глубине, когда поток не готов вернуть значение, вместо этого возвращается Poll::Pending. Регистрируется waker задачи. Как только поток должен быть опрошен снова, waker уведомляется.
Метод size_hint() используется так же, как и с итераторами.
Обычно при ручной реализации Stream это делается путем комбинирования future и других потоков. В качестве примера давайте построим на основе future Delay, который мы реализовали в Асинхронность в глубине. Мы преобразуем его в поток, который выдает () три раза с интервалом 10 мс.
#![allow(unused)] fn main() { use tokio_stream::Stream; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::time::Instant; struct Interval { rem: usize, delay: Delay, } struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } impl Interval { fn new() -> Self { Self { rem: 3, delay: Delay { when: Instant::now() } } } } impl Stream for Interval { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { if self.rem == 0 { // Больше нет задержек return Poll::Ready(None); } match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { let when = self.delay.when + Duration::from_millis(10); self.delay = Delay { when }; self.rem -= 1; Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, } } } }
async-stream
Ручная реализация потоков с использованием трейта Stream может быть утомительной. К сожалению, язык программирования Rust еще не поддерживает синтаксис async/await для определения потоков. Это в работе, но еще не готово.
Крейт async-stream доступен как временное решение. Этот крейт предоставляет макрос stream!, который преобразует ввод в поток. Используя этот крейт, вышеуказанный интервал может быть реализован так:
#![allow(unused)] fn main() { use async_stream::stream; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tokio_stream::StreamExt; use std::time::{Duration, Instant}; struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } async fn dox() { let stream = stream! { let mut when = Instant::now(); for _ in 0..3 { let delay = Delay { when }; delay.await; yield (); when += Duration::from_millis(10); } } ; tokio::pin!(stream); while let Some(_) = stream.next().await { } } }