Async в грубину
На этом этапе мы завершили довольно полный обзор асинхронного Rust и Tokio. Теперь мы углубимся в модель асинхронной выполнения Rust. В самом начале руководства мы намекали, что асинхронный Rust использует уникальный подход. Теперь мы объясним, что это означает.
Future (Футуры)
В качестве краткого повторения рассмотрим очень простую асинхронную функцию. Это ничего нового по сравнению с тем, что уже было рассмотрено в руководстве.
#![allow(unused)] fn main() { use tokio::net::TcpStream; async fn my_async_fn() { println!("hello from async"); let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap(); println!("async TCP operation complete"); } }
Мы вызываем функцию, и она возвращает некоторое значение. Мы вызываем .await для этого значения.
async fn my_async_fn() {} #[tokio::main] async fn main() { let what_is_this = my_async_fn(); // Пока ничего не напечатано. what_is_this.await; // Текст напечатан, сокет установлен и закрыт. }
Значение, возвращаемое my_async_fn(), является future. Future — это значение, которое реализует трейт std::future::Future, предоставляемый стандартной библиотекой. Это значения, которые содержат выполняющуюся асинхронную операцию.
Определение трейта std::future::Future:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } }
Связанный тип Output — это тип, который future производит после завершения. Тип Pin — это то, как Rust может поддерживать заимствования в async функциях. Подробнее см. в документации стандартной библиотеки.
В отличие от того, как future реализованы в других языках, future в Rust не представляет вычисление, происходящее в фоновом режиме, скорее future в Rust и есть само вычисление. Владелец future отвечает за продвижение вычисления путем опрашивания future. Это делается вызовом Future::poll.
Реализация Future
Давайте реализуем очень простой future. Этот future будет:
- Ждать до определенного момента времени.
- Выводить текст в STDOUT.
- Возвращать строку.
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Пока игнорируем эту строку. cx.waker().wake_by_ref(); Poll::Pending } } } #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }
Async fn как Future
В главной функции мы создаем future и вызываем для него .await. В асинхронных функциях мы можем вызывать .await для любого значения, реализующего Future. В свою очередь, вызов async функции возвращает анонимный тип, реализующий Future. В случае async fn main(), сгенерированный future примерно такой:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; enum MainFuture { // Инициализирован, никогда не опрашивался State0, // Ожидание `Delay`, т.е. строка `future.await`. State1(Delay), // Future завершен. Terminated, } struct Delay { when: Instant }; impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<&'static str> { unimplemented!(); } } impl Future for MainFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { use MainFuture::*; loop { match *self { State0 => { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; *self = State1(future); } State1(ref mut my_future) => { match Pin::new(my_future).poll(cx) { Poll::Ready(out) => { assert_eq!(out, "done"); *self = Terminated; return Poll::Ready(()); } Poll::Pending => { return Poll::Pending; } } } Terminated => { panic!("future polled after completion") } } } } } }
Future в Rust являются машинами состояний. Здесь MainFuture представлен как enum возможных состояний future. Future начинается в состоянии State0. Когда вызывается poll, future пытается продвинуть свое внутреннее состояние как можно дальше. Если future может завершиться, возвращается Poll::Ready, содержащий результат асинхронного вычисления.
Если future не может завершиться, обычно из-за того, что ресурсы, которых он ожидает, не готовы, то возвращается Poll::Pending. Получение Poll::Pending указывает вызывающей стороне, что future завершится позже и вызывающая сторона должна снова вызвать poll позже.
Мы также видим, что future состоят из других future. Вызов poll на внешнем future приводит к вызову poll внутреннего future.
Исполнители
Асинхронные функции Rust возвращают future. Для future должен вызываться poll, чтобы продвигать их состояние. Future состоят из других future. Итак, вопрос: что вызывает poll на самом внешнем future?
Вспомним, что для запуска асинхронных функций они должны быть либо переданы в tokio::spawn, либо быть главной функцией с аннотацией #[tokio::main]. Это приводит к отправке сгенерированного внешнего future в исполнитель Tokio. Исполнитель отвечает за вызов Future::poll на внешнем future, продвигая асинхронное вычисление к завершению.
Mini Tokio
Чтобы лучше понять, как все это сочетается, давайте реализуем нашу собственную минимальную версию Tokio! Полный код можно найти здесь.
use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use futures::task; fn main() { let mut mini_tokio = MiniTokio::new(); mini_tokio.spawn(async { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }); mini_tokio.run(); } struct Delay { when: Instant } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<&'static str> { Poll::Ready("done") } } struct MiniTokio { tasks: VecDeque<Task>, } type Task = Pin<Box<dyn Future<Output = ()> + Send>>; impl MiniTokio { fn new() -> MiniTokio { MiniTokio { tasks: VecDeque::new(), } } /// Создать задачу в экземпляре mini-tokio. fn spawn<F>(&mut self, future: F) where F: Future<Output = ()> + Send + 'static, { self.tasks.push_back(Box::pin(future)); } fn run(&mut self) { let waker = task::noop_waker(); let mut cx = Context::from_waker(&waker); while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { self.tasks.push_back(task); } } } }
Это запускает async-блок. Создается экземпляр Delay с запрошенной задержкой и ожидается. Однако наша реализация пока имеет серьезный недостаток. Наш исполнитель никогда не переходит в режим ожидания. Исполнитель непрерывно циклически опрашивает все созданные future. Большую часть времени future не будут готовы к выполнению работы и снова вернут Poll::Pending. Процесс будет потреблять циклы CPU и в целом будет не очень эффективным.
В идеале мы хотим, чтобы mini-tokio опрашивал future только тогда, когда future может продолжить работу. Это происходит, когда ресурс, которого ожидает задача, становится готовым к выполнению запрошенной операции. Если задача хочет читать данные из TCP-сокета, то мы хотим опрашивать задачу только тогда, когда TCP-сокет получил данные. В нашем случае задача ожидает достижения заданного Instant. В идеале mini-tokio должен опрашивать задачу только после того, как этот момент времени наступит.
Для достижения этого, когда ресурс опрашивается и ресурс не готов, ресурс отправит уведомление, когда перейдет в готовое состояние.
Wakers (Побудители)
Wakers — это недостающий элемент. Это система, с помощью которой ресурс может уведомлять ожидающую задачу о том, что ресурс стал готов к продолжению операции.
Давайте снова посмотрим на определение Future::poll:
#![allow(unused)] fn main() { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; }
Аргумент Context в poll имеет метод waker(). Этот метод возвращает Waker, связанный с текущей задачей. Waker имеет метод wake(). Вызов этого метода сигнализирует исполнителю, что связанная задача должна быть запланирована на выполнение. Ресурсы вызывают wake(), когда переходят в готовое состояние, чтобы уведомить исполнителя, что опрос задачи сможет продвинуться.
Обновление Delay
Мы можем обновить Delay для использования wakers:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::thread; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Получаем handle к waker текущей задачи let waker = cx.waker().clone(); let when = self.when; // Создаем поток таймера. thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } waker.wake(); }); Poll::Pending } } } }
Теперь, когда запрошенная длительность истекла, вызывающая задача уведомляется, и исполнитель может обеспечить повторное планирование задачи. Следующий шаг — обновить mini-tokio для прослушивания уведомлений пробуждения.
В нашей реализации Delay все еще осталось несколько проблем. Мы исправим их позже.
предупреждение Когда future возвращает
Poll::Pending, он должен обеспечить, чтобы waker был сигнализирован в какой-то момент. Забывание этого приводит к зависанию задачи на неопределенное время.Забывание разбудить задачу после возврата
Poll::Pendingявляется распространенным источником ошибок.
Вспомним первую итерацию Delay. Вот была реализация future:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; struct Delay { when: Instant } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Пока игнорируем эту строку. cx.waker().wake_by_ref(); Poll::Pending } } } }
Перед возвратом Poll::Pending мы вызвали cx.waker().wake_by_ref(). Это необходимо для соблюдения контракта future. Возвращая Poll::Pending, мы отвечаем за сигнализацию waker. Поскольку мы еще не реализовали поток таймера, мы сигнализировали waker немедленно. Это приведет к немедленному повторному планированию future, повторному выполнению и, вероятно, он еще не будет готов к завершению.
Обратите внимание, что разрешается сигнализировать waker чаще, чем необходимо. В этом конкретном случае мы сигнализируем waker, даже если совсем не готовы продолжить операцию. С этим нет ничего плохого, кроме некоторых потраченных впустую циклов CPU. Однако эта конкретная реализация приведет к активному циклу.
Обновление Mini Tokio
Следующий шаг — обновление Mini Tokio для получения уведомлений waker. Мы хотим, чтобы исполнитель запускал задачи только тогда, когда они пробуждаются, и для этого Mini Tokio будет предоставлять свой собственный waker. Когда waker вызывается, его связанная задача ставится в очередь на выполнение. Mini-Tokio передает этот waker future при его опросе.
Обновленный Mini Tokio будет использовать канал для хранения запланированных задач. Каналы позволяют задачам ставиться в очередь на выполнение из любого потока. Wakers должны быть Send и Sync.
информация Трейты
SendиSync— это маркерные трейты, связанные с параллелизмом, предоставляемые Rust. Типы, которые могут быть отправлены в другой поток, являютсяSend. Большинство типов являютсяSend, но, например,Rc— нет. Типы, к которым можно получить параллельный доступ через неизменяемые ссылки, являютсяSync. Тип может бытьSend, но неSync— хороший примерCell, который можно изменять через неизменяемую ссылку и, следовательно, небезопасно обращаться к нему параллельно.Для получения более подробной информации см. соответствующую главу в книге Rust.
Обновим структуру MiniTokio.
#![allow(unused)] fn main() { use std::sync::mpsc; use std::sync::Arc; struct MiniTokio { scheduled: mpsc::Receiver<Arc<Task>>, sender: mpsc::Sender<Arc<Task>>, } struct Task { // Это будет заполнено вскоре. } }
Wakers являются Sync и могут быть клонированы. Когда вызывается wake, задача должна быть запланирована на выполнение. Для реализации этого у нас есть канал. Когда wake() вызывается на waker, задача помещается в передающую половину канала. Наша структура Task будет реализовывать логику пробуждения. Для этого ей нужно содержать как созданный future, так и половину канала для отправки. Мы помещаем future в структуру TaskFuture вместе с Poll enum для отслеживания последнего результата Future::poll(), что необходимо для обработки ложных пробуждений. Более подробная информация приведена в реализации метода poll() в TaskFuture.
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::sync::mpsc; use std::task::Poll; use std::sync::{Arc, Mutex}; /// Структура, содержащая future и результат последнего вызова его метода `poll`. struct TaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, } struct Task { // `Mutex` нужен, чтобы `Task` реализовывал `Sync`. Только один поток обращается к `task_future` в любой given время. // `Mutex` не требуется для корректности. Настоящий Tokio не использует мьютекс здесь, но у настоящего Tokio больше строк кода, чем может поместиться на одной странице руководства. task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, } impl Task { fn schedule(self: &Arc<Self>) { self.executor.send(self.clone()); } } }
Чтобы запланировать задачу, Arc клонируется и отправляется через канал. Теперь нам нужно связать нашу функцию schedule с std::task::Waker. Стандартная библиотека предоставляет низкоуровневый API для этого с использованием ручного построения vtable. Эта стратегия обеспечивает максимальную гибкость для реализаторов, но требует кучу небезопасного шаблонного кода. Вместо использования RawWakerVTable напрямую, мы будем использовать утилиту ArcWake, предоставляемую крейтом futures. Это позволяет нам реализовать простой трейт, чтобы представить нашу структуру Task как waker.
Добавьте следующую зависимость в ваш Cargo.toml, чтобы подключить futures.
futures = "0.3"
Затем реализуйте futures::task::ArcWake.
#![allow(unused)] fn main() { use futures::task::{self, ArcWake}; use std::sync::Arc; struct Task {} impl Task { fn schedule(self: &Arc<Self>) {} } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { arc_self.schedule(); } } }
Когда поток таймера выше вызывает waker.wake(), задача помещается в канал. Далее мы реализуем получение и выполнение задач в функции MiniTokio::run().
#![allow(unused)] fn main() { use std::sync::mpsc; use futures::task::{self, ArcWake}; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; struct MiniTokio { scheduled: mpsc::Receiver<Arc<Task>>, sender: mpsc::Sender<Arc<Task>>, } struct TaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, } struct Task { task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) {} } impl MiniTokio { fn run(&self) { while let Ok(task) = self.scheduled.recv() { task.poll(); } } /// Инициализировать новый экземпляр mini-tokio. fn new() -> MiniTokio { let (sender, scheduled) = mpsc::channel(); MiniTokio { scheduled, sender } } /// Создать задачу в экземпляре mini-tokio. /// /// Данный future оборачивается в оболочку `Task` и помещается в очередь `scheduled`. /// Future будет выполнен при вызове `run`. fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { Task::spawn(future, &self.sender); } } impl TaskFuture { fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture { TaskFuture { future: Box::pin(future), poll: Poll::Pending, } } fn poll(&mut self, cx: &mut Context<'_>) { // Ложные пробуждения разрешены, даже после того, как future вернул `Ready`. Однако опрос future, который уже вернул `Ready`, *не* допускается. По этой причине нам нужно проверить, что future все еще находится в состоянии pending, прежде чем вызывать его. Невыполнение этого может привести к панике. if self.poll.is_pending() { self.poll = self.future.as_mut().poll(cx); } } } impl Task { fn poll(self: Arc<Self>) { // Создаем waker из экземпляра `Task`. Это использует реализацию `ArcWake` сверху. let waker = task::waker(self.clone()); let mut cx = Context::from_waker(&waker); // Никакой другой поток никогда не пытается заблокировать task_future let mut task_future = self.task_future.try_lock().unwrap(); // Опрашиваем внутренний future task_future.poll(&mut cx); } // Создает новую задачу с данным future. // // Инициализирует новую оболочку Task, содержащую данный future, и помещает ее в `sender`. Принимающая половина канала получит задачу и выполнит ее. fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { task_future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), }); let _ = sender.send(task); } } }
Здесь происходит несколько вещей. Во-первых, реализована MiniTokio::run(). Функция работает в цикле, получая запланированные задачи из канала. Когда задачи помещаются в канал при их пробуждении, эти задачи могут продвигаться при выполнении.
Кроме того, функции MiniTokio::new() и MiniTokio::spawn() adjusted для использования канала вместо VecDeque. Когда создаются новые задачи, им передается клон передающей части канала, который задача может использовать для планирования себя в среде выполнения.
Функция Task::poll() создает waker с помощью утилиты ArcWake из крейта futures. Waker используется для создания task::Context. Этот task::Context передается в poll.
Итог
Теперь мы увидели сквозной пример того, как работает асинхронный Rust. Функция async/await Rust основана на трейтах. Это позволяет сторонним крейтам, таким как Tokio, предоставлять детали выполнения.
- Асинхронные операции Rust ленивы и требуют, чтобы вызывающая сторона опрашивала их.
- Wakers передаются future для связи future с задачей, вызывающей его.
- Когда ресурс не готов завершить операцию, возвращается
Poll::Pendingи записывается waker задачи. - Когда ресурс становится готовым, waker задачи уведомляется.
- Исполнитель получает уведомление и планирует задачу на выполнение.
- Задача снова опрашивается, на этот раз ресурс готов и задача продвигается.
Несколько незавершенных моментов
Вспомним, когда мы реализовывали future Delay, мы сказали, что есть еще несколько вещей для исправления. Асинхронная модель Rust позволяет одному future мигрировать между задачами во время выполнения. Рассмотрим следующее:
use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let mut delay = Some(Delay { when }); poll_fn(move |cx| { let mut delay = delay.take().unwrap(); let res = Pin::new(&mut delay).poll(cx); assert!(res.is_pending()); tokio::spawn(async move { delay.await; }); Poll::Ready(()) }).await; }
Функция poll_fn создает экземпляр Future с помощью замыкания. Приведенный фрагмент создает экземпляр Delay, опрашивает его один раз, затем отправляет экземпляр Delay в новую задачу, где он ожидается. В этом примере Delay::poll вызывается более одного раза с разными экземплярами Waker. Когда это происходит, вы должны убедиться, что вызывается wake на Waker, переданном в самый последний вызов poll.
При реализации future критически важно предполагать, что каждый вызов poll может предоставить другой экземпляр Waker. Функция poll должна обновлять любой ранее записанный waker новым.
Наша предыдущая реализация Delay создавала новый поток каждый раз, когда он опрашивался. Это нормально, но может быть очень неэффективно, если он опрашивается слишком часто (например, если вы используете select! над этим future и другим future, оба опрашиваются, когда у любого из них есть событие). Один из подходов к этому — запомнить, создали ли вы уже поток, и создавать новый поток только если вы еще не создали его. Однако если вы делаете это, вы должны обеспечить, чтобы Waker потока обновлялся при последующих вызовах poll, иначе вы не будите самый последний Waker.
Чтобы исправить нашу предыдущую реализацию, мы могли бы сделать что-то вроде этого:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant}; struct Delay { when: Instant, // Это Some, когда мы создали поток, и None otherwise. waker: Option<Arc<Mutex<Waker>>>, } impl Future for Delay { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { // Проверяем текущий момент времени. Если длительность истекла, то этот future завершился, поэтому мы возвращаем `Poll::Ready`. if Instant::now() >= self.when { return Poll::Ready(()); } // Длительность не истекла. Если это первый вызов future, создаем поток таймера. Если поток таймера уже запущен, обеспечиваем, чтобы сохраненный `Waker` соответствовал waker текущей задачи. if let Some(waker) = &self.waker { let mut waker = waker.lock().unwrap(); // Проверяем, соответствует ли сохраненный waker waker текущей задачи. // Это необходимо, поскольку экземпляр future `Delay` может переместиться в другую задачу между вызовами `poll`. Если это происходит, waker, содержащийся в данном `Context`, будет отличаться, и мы должны обновить наш сохраненный waker, чтобы отразить это изменение. if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } } else { let when = self.when; let waker = Arc::new(Mutex::new(cx.waker().clone())); self.waker = Some(waker.clone()); // Это первый раз, когда вызывается `poll`, создаем поток таймера. thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } // Длительность истекла. Уведомляем вызывающую сторону, вызывая waker. let waker = waker.lock().unwrap(); waker.wake_by_ref(); }); } // К этому моменту waker сохранен и поток таймера запущен. // Длительность не истекла (напомним, что мы проверили это в первую очередь), следовательно, future не завершился, поэтому мы должны вернуть `Poll::Pending`. // // Контракт трейта `Future` требует, чтобы при возврате `Pending` future обеспечивал, чтобы данный waker был сигнализирован, когда future должен быть опрошен снова. В нашем случае, возвращая `Pending` здесь, мы обещаем, что мы вызовем данный waker, включенный в аргумент `Context`, once запрошенная длительность истечет. Мы обеспечиваем это, создавая поток таймера выше. // // Если мы забудем вызвать waker, задача зависнет на неопределенное время. Poll::Pending } } }
Это немного сложно, но идея в том, что при каждом вызове poll future проверяет, соответствует ли предоставленный waker ранее записанному waker. Если два waker совпадают, то больше ничего делать не нужно. Если они не совпадают, то записанный waker должен быть обновлен.
Утилита Notify
Мы продемонстрировали, как future Delay может быть реализован вручную с использованием wakers. Wakers являются основой того, как работает асинхронный Rust. Обычно нет необходимости опускаться до этого уровня. Например, в случае Delay мы могли бы реализовать его полностью с помощью async/await, используя утилиту tokio::sync::Notify. Эта утилита предоставляет базовый механизм уведомления задач. Она обрабатывает детали wakers, включая обеспечение того, что записанный waker соответствует текущей задаче.
Используя Notify, мы можем реализовать функцию delay с помощью async/await следующим образом:
#![allow(unused)] fn main() { use tokio::sync::Notify; use std::sync::Arc; use std::time::{Duration, Instant}; use std::thread; async fn delay(dur: Duration) { let when = Instant::now() + dur; let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } notify_clone.notify_one(); }); notify.notified().await; } }