Акторы с Tokio
Опубликовано 2021-02-13 Автор: Alice Ryhl Источник
Эта статья о создании акторов напрямую с помощью Tokio, без использования каких-либо библиотек акторов, таких как Actix. Оказывается, это довольно легко сделать, однако есть некоторые детали, о которых вам следует знать:
- Где разместить вызов
tokio::spawn. - Структура с методом
runvs отдельная функция. - Handle'ы (дескрипторы) для актора.
- Противодавление (Backpressure) и ограниченные каналы.
- Плавное завершение работы (Graceful shutdown).
Методы, описанные в этой статье, должны работать с любым исполнителем (executor), но для простоты мы будем говорить только о Tokio. Есть некоторое пересечение с главами о порождении задач и каналах из учебника по Tokio, и я рекомендую также прочитать эти главы.
Прежде чем мы сможем говорить о том, как писать актор, нам нужно знать, что такое актор. Основная идея актора заключается в порождении самодостаточной задачи, которая выполняет некоторую работу независимо от других частей программы. Обычно эти акторы общаются с остальной частью программы с помощью каналов передачи сообщений. Поскольку каждый актор работает независимо, программы, разработанные с их использованием, естественно параллельны.
Распространенный вариант использования акторов — назначить актору эксклюзивное владение некоторым ресурсом, которым вы хотите делиться, а затем позволить другим задачам получать доступ к этому ресурсу косвенно, общаясь с актором. Например, если вы реализуете сервер чата, вы можете породить задачу для каждого подключения и главную задачу, которая маршрутизирует сообщения чата между другими задачами. Это полезно, потому что главная задача может избежать необходимости работать с сетевым вводом-выводом, а задачи подключения могут сосредоточиться исключительно на работе с сетевым вводом-выводом.
Эта статья также доступна в виде доклада на YouTube.
Рецепт
Актор разделен на две части: задача (task) и handle (дескриптор). Задача — это независимо порожденная задача Tokio, которая фактически выполняет обязанности актора, а handle — это структура, которая позволяет вам общаться с задачей.
Рассмотрим простой актор. Актор внутренне хранит счетчик, который используется для получения какого-то уникального идентификатора. Базовая структура актора будет выглядеть примерно так:
#![allow(unused)] fn main() { use tokio::sync::{oneshot, mpsc}; struct MyActor { receiver: mpsc::Receiver<ActorMessage>, next_id: u32, } enum ActorMessage { GetUniqueId { respond_to: oneshot::Sender<u32>, }, } impl MyActor { fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self { MyActor { receiver, next_id: 0, } } fn handle_message(&mut self, msg: ActorMessage) { match msg { ActorMessage::GetUniqueId { respond_to } => { self.next_id += 1; // `let _ =` игнорирует любые ошибки при отправке. // // Это может произойти, если макрос `select!` // используется для отмены ожидания ответа. let _ = respond_to.send(self.next_id); }, } } } async fn run_my_actor(mut actor: MyActor) { while let Some(msg) = actor.receiver.recv().await { actor.handle_message(msg); } } }
Теперь, когда у нас есть сам актор, нам также нужен handle к актору. Handle — это объект, который другие части кода могут использовать для общения с актором, и он же поддерживает жизнь актора.
Handle будет выглядеть так:
#![allow(unused)] fn main() { #[derive(Clone)] pub struct MyActorHandle { sender: mpsc::Sender<ActorMessage>, } impl MyActorHandle { pub fn new() -> Self { let (sender, receiver) = mpsc::channel(8); let actor = MyActor::new(receiver); tokio::spawn(run_my_actor(actor)); Self { sender } } pub async fn get_unique_id(&self) -> u32 { let (send, recv) = oneshot::channel(); let msg = ActorMessage::GetUniqueId { respond_to: send, }; // Игнорируем ошибки отправки. Если эта отправка не удалась, // то и recv.await ниже тоже не удастся. Нет причины проверять // одну и ту же ошибку дважды. let _ = self.sender.send(msg).await; recv.await.expect("Задача актора была убита") } } }
Давайте внимательнее рассмотрим различные части этого примера.
ActorMessage. Перечисление ActorMessage определяет типы сообщений, которые мы можем отправить актору. Используя перечисление, мы можем иметь много разных типов сообщений, и каждый тип сообщения может иметь свой собственный набор аргументов. Мы возвращаем значение отправителю, используя канал oneshot, который является каналом передачи сообщений, позволяющим отправить ровно одно сообщение.
В примере выше мы сопоставляем шаблон с перечислением внутри метода handle_message для структуры актора, но это не единственный способ структурирования этого. Можно также сопоставлять шаблон с перечислением в функции run_my_actor. Каждая ветвь в этом сопоставлении может затем вызывать различные методы, такие как get_unique_id, для объекта актора.
Ошибки при отправке сообщений. При работе с каналами не все ошибки являются фатальными. Из-за этого в примере иногда используется let _ =, чтобы игнорировать ошибки. Как правило, операция отправки в канал завершается неудачей, если получатель был удален.
Первый случай этого в нашем примере — строка в акторе, где мы отвечаем на отправленное нам сообщение. Это может произойти, если получатель больше не заинтересован в результате операции, например, если задача, отправившая сообщение, могла быть убита.
Завершение работы актора. Мы можем определить, когда актор должен завершить работу, посмотрев на сбои при получении сообщений. В нашем примере это происходит в следующем цикле while:
#![allow(unused)] fn main() { while let Some(msg) = actor.receiver.recv().await { actor.handle_message(msg); } }
Когда все отправители для получателя удалены, мы знаем, что больше никогда не получим сообщений и, следовательно, можем завершить работу актора. Когда это происходит, вызов .recv() возвращает None, и поскольку он не соответствует шаблону Some(msg), цикл while завершается, и функция возвращает управление.
#[derive(Clone)]. Структура MyActorHandle реализует типаж Clone. Она может это делать, потому что mpsc означает, что это канал с несколькими отправителями и одним получателем (multiple-producer, single-consumer). Поскольку канал допускает несколько производителей, мы можем свободно клонировать наш handle к актору, позволяя нам общаться с ним из нескольких мест.
Метод run в структуре
В приведенном выше примере используется функция верхнего уровня, не определенная ни для какой структуры, в качестве того, что мы порождаем как задачу Tokio. Однако многие считают более естественным определить метод run непосредственно для структуры MyActor и порождать его. Это, конечно, тоже работает, но причина, по которой я привожу пример с функцией верхнего уровня, заключается в том, что она более естественно подводит вас к подходу, который не создает много проблем с временами жизни.
Чтобы понять почему, я подготовил пример того, что часто придумывают люди, незнакомые с этим шаблоном.
#![allow(unused)] fn main() { impl MyActor { fn run(&mut self) { tokio::spawn(async move { while let Some(msg) = self.receiver.recv().await { self.handle_message(msg); } }); } pub async fn get_unique_id(&self) -> u32 { let (send, recv) = oneshot::channel(); let msg = ActorMessage::GetUniqueId { respond_to: send, }; // Игнорируем ошибки отправки. Если эта отправка не удалась, // то и recv.await ниже тоже не удастся. Нет причины проверять // одну и ту же ошибку дважды. let _ = self.sender.send(msg).await; recv.await.expect("Задача актора была убита") } } }
... и без отдельного MyActorHandle
Два источника проблем в этом примере:
- Вызов
tokio::spawnнаходится внутриrun. - Актор и handle — это одна и та же структура.
Первая проблема вызывает затруднения, потому что функция tokio::spawn требует, чтобы аргумент был 'static. Это означает, что новая задача должна владеть всем внутри себя, что является проблемой, потому что метод заимствует self, а значит, он не может передать владение self новой задаче.
Вторая проблема вызывает затруднения, потому что Rust обеспечивает принцип единоличного владения. Если вы объедините и актор, и handle в одну структуру, вы (по крайней мере, с точки зрения компилятора) предоставляете каждому handle доступ к полям, принадлежащим задаче актора. Например, целое число next_id должно принадлежать только задаче актора и не должно быть доступно напрямую ни из одного из handle'ов.
Тем не менее, существует рабочая версия. Исправив две вышеуказанные проблемы, вы получите следующее:
#![allow(unused)] fn main() { impl MyActor { async fn run(&mut self) { while let Some(msg) = self.receiver.recv().await { self.handle_message(msg); } } } impl MyActorHandle { pub fn new() -> Self { let (sender, receiver) = mpsc::channel(8); let actor = MyActor::new(receiver); tokio::spawn(async move { actor.run().await }); Self { sender } } } }
Это работает идентично функции верхнего уровня. Обратите внимание, что, строго говоря, можно написать версию, где tokio::spawn находится внутри run, но я не рекомендую этот подход.
Вариации на тему
Актор, который я использовал в качестве примера в этой статье, использует парадигму запрос-ответ для сообщений, но вы не обязаны делать это так. В этом разделе я дам некоторые идеи о том, как вы можете изменить эту концепцию.
Без ответов на сообщения
В примере, который я использовал для введения концепции, включался ответ на сообщения, отправленные через канал oneshot, но вам не всегда нужен ответ вообще. В этих случаях нет ничего плохого в том, чтобы просто не включать канал oneshot в перечисление сообщений. Когда в канале есть место, это даже позволит вам вернуться из отправки до того, как сообщение будет обработано.
Вы все равно должны убедиться, что используете ограниченный канал (bounded channel), чтобы количество сообщений, ожидающих в канале, не росло без ограничений. В некоторых случаях это будет означать, что отправка по-прежнему должна быть асинхронной функцией, чтобы обрабатывать случаи, когда операции отправки необходимо ждать больше места в канале.
Однако есть альтернатива созданию асинхронного метода отправки. Вы можете использовать метод try_send и обрабатывать ошибки отправки, просто убивая актор. Это может быть полезно в случаях, когда актор управляет TcpStream, пересылая любые отправленные вами сообщения в подключение. В этом случае, если запись в TcpStream не успевает, вы можете просто закрыть соединение.
Несколько handle-структур для одного актора
Если актору нужно получать сообщения из разных мест, вы можете использовать несколько handle-структур, чтобы обеспечить отправку определенных сообщений только из определенных мест.
При этом вы все равно можете повторно использовать тот же канал mpsc внутри, с перечислением, которое содержит все возможные типы сообщений. Если вы хотите использовать для этой цели отдельные каналы, актор может использовать tokio::select! для получения из нескольких каналов одновременно.
#![allow(unused)] fn main() { loop { tokio::select! { Some(msg) = chan1.recv() => { // обработать msg }, Some(msg) = chan2.recv() => { // обработать msg }, else => break, } } }
Вам нужно быть осторожным с обработкой случаев, когда каналы закрыты, так как их метод recv в этом случае немедленно возвращает None. К счастью, макрос tokio::select! позволяет вам обработать этот случай, предоставляя шаблон Some(msg). Если закрыт только один канал, эта ветвь отключается, и прием из другого канала продолжается. Когда оба закрыты, выполняется ветвь else, и используется break для выхода из цикла.
Акторы, отправляющие сообщения другим акторам
Нет ничего плохого в том, что акторы отправляют сообщения другим акторам. Для этого вы можете просто дать одному актору handle какого-то другого актора.
Вам нужно быть немного осторожным, если ваши акторы образуют цикл, потому что, удерживая handle-структуры друг друга, последний отправитель никогда не удаляется, предотвращая завершение работы. Чтобы обработать этот случай, вы можете сделать так, чтобы один из акторов имел две handle-структуры с отдельными каналами mpsc, но с tokio::select!, который выглядит так:
#![allow(unused)] fn main() { loop { tokio::select! { opt_msg = chan1.recv() => { let msg = match opt_msg { Some(msg) => msg, None => break, }; // обработать msg }, Some(msg) = chan2.recv() => { // обработать msg }, } } }
Вышеуказанный цикл всегда завершится, если chan1 закрыт, даже если chan2 все еще открыт. Если chan2 — это канал, который является частью цикла акторов, это разрывает цикл и позволяет акторам завершить работу.
Альтернатива — просто вызвать abort для одного из акторов в цикле.
Несколько акторов, использующих один handle
Так же, как вы можете иметь несколько handle'ов на актор, вы также можете иметь несколько акторов на handle. Наиболее распространенный пример этого — при обработке подключения, такого как TcpStream, где вы обычно порождаете две задачи: одну для чтения и одну для записи. При использовании этого шаблона вы делаете задачи чтения и записи как можно более простыми — их единственная задача — выполнять ввод-вывод. Задача чтения будет просто отправлять любые полученные сообщения в какую-то другую задачу, обычно другому актору, а задача записи будет просто пересылать любые полученные сообщения в подключение.
Этот шаблон очень полезен, потому что он изолирует сложность, связанную с выполнением ввода-вывода, а это означает, что остальная часть программы может притворяться, что запись чего-либо в подключение происходит мгновенно, хотя фактическая запись происходит некоторое время спустя, когда актор обрабатывает сообщение.
Остерегайтесь циклов
Я уже немного говорил о циклах в разделе «Акторы, отправляющие сообщения другим акторам», где обсуждал завершение работы акторов, которые образуют цикл. Однако завершение работы — не единственная проблема, которую могут вызвать циклы, потому что цикл также может привести к взаимоблокировке (deadlock), когда каждый актор в цикле ждет, пока следующий актор получит сообщение, но этот следующий актор не получит это сообщение, пока его следующий актор не получит сообщение, и так далее.
Чтобы избежать такой взаимоблокировки, вы должны убедиться, что нет циклов каналов с ограниченной емкостью. Причина этого в том, что метод send для ограниченного канала не возвращается немедленно. Каналы, чей метод send всегда возвращается немедленно, не учитываются в такого рода цикле, поскольку вы не можете заблокироваться на такой отправке.
Обратите внимание, что это означает, что канал oneshot не может быть частью взаимоблокированного цикла, поскольку его метод send всегда возвращается немедленно. Также обратите внимание, что если вы используете try_send вместо send для отправки сообщения, это также не может быть частью взаимоблокированного цикла.
Спасибо matklad за указание на проблемы с циклами и взаимоблокировками.