Древовидная структурированная конкурентность II: Замена фоновых задач акторами

2025-07-02 Yoshua Wuyts Источник

  • Структурированная конкурентность
  • Проблема фоновых задач
  • Паттерн актора
  • Чем акторы отличаются от глобальных переменных?
  • Заключение
  • Приложение А: Шаблон паттерна актора

Структурированная конкурентность

(Древовидная) структурированная конкурентность — это здорово, потому что она значительно упрощает параллельные программы. Она значительно уменьшает, если не откровенно устраняет возможность логических состояний гонки из-за проблем конкурентности. И концептуально это не так сложно, поскольку мы можем закодировать структурированную конкурентность всего двумя правилами:

  1. Каждое дочернее вычисление (кроме корневого) должно иметь ровно одного логического родителя в любой момент времени во время выполнения.
  2. Каждое родительское вычисление может иметь несколько дочерних вычислений, выполняющихся конкурентно в любой момент.

Каждый ребенок всегда должен иметь ровно одного родителя. Но каждый родитель может иметь несколько детей. Эти два правила позволяют нам вывести третье правило, которое находится в основе системы:

  • Дочерние вычисления никогда не переживут своих родителей.

Если мы применим эти правила, мы заметим, что граф вызовов программы естественным образом образует дерево. В этом нет ничего особенного, поскольку неконкурентные программы уже работают таким образом. Если это не интуитивно: поймите, что, например, flame graphs и flame charts — не что иное, как визуализация данных в форме дерева.

Проблема фоновых задач

Один из самых распространенных вопросов, которые задают люди, изучая структурированную конкурентность:

«Как мне запустить работу после возврата из функции?»

Это справедливый вопрос! Распространенный пример — при реализации HTTP-сервера: вы можете захотеть логировать метрики для каждого запроса, но вы не хотите задерживать отправку ответа до завершения логирования. В этот момент люди часто прибегают к неструктурированной конкурентности в виде фоновых задач, часто используя какую-то форму task::spawn:

#![allow(unused)]
fn main() {
/// Некий HTTP-обработчик
async fn handler(req: Request, _context: ()) -> Result<Response> {
    // Получить тело из запроса
    let body = req.body().try_to_string()?;

    // Создать отсоединенную фоновую задачу,
    // которая выполняет некоторую работу в фоне
    task::spawn(async {
        background_work(body).await.unwrap();
    });

    // Сконструировать ответ и вернуть
    // его из обработчика
    Ok(Response::Ok())
}
}

В этом примере task::spawn создает фоновую задачу, которой не владеет ни один родитель. Это нарушает второе правило структурированной конкурентности: каждое вычисление всегда должно иметь одного логического родителя. Мы можем видеть, что это проблема, если background_work завершится неудачно. Сейчас, если она ошибочна, у нас нет способа распространить ошибку или повторить операцию, так что все, что мы можем реалистично сделать, — это позволить ей упасть (crash).

В этом примере у нас также нет способа отменить работу, выполняемую в задаче, поскольку она не привязана ни к какому логическому родителю. Но несмотря на эти проблемы, сам вариант использования также очень разумен: нам не следует ждать завершения фоновой работы перед отправкой ответа. Итак: что нам делать?

Паттерн актора

Мы хотим, чтобы работа происходила в фоне, одновременно будучи привязанной к некоторому родительскому вычислению. Я считаю, что правильный способ решить это — использовать акторы: типы, которым мы можем отправлять данные и которые будут планировать работу для нас.

В таких фреймворках, как actix и choo, это делается с помощью сообщений, которые отправляются с использованием каналов или эмиттеров событий. Но в языках, таких как Swift, поддержка акторов встроена непосредственно в язык и relies on методы вместо сообщений.

Но для того, что мы пытаемся сделать здесь, нам на самом деле не нужны какие-либо навороченные библиотеки или фреймворки: мы можем легко построить своих собственных акторов, используя структуры, блоки impl и каналы. Вот базовый шаблон, который вы можете использовать для реализации своих собственных акторов. Без комментариев это всего около 15 строк:

#![allow(unused)]
fn main() {
// Используем библиотеки `async-channel` и `futures-concurrency`
use async_channel::{self as channel, Sender, Receiver};
use futures_concurrency::prelude::*;

/// Тип данных, которые мы будем отправлять нашему актору.
/// Это просто простой псевдоним для типа `Body`
/// нашего HTTP-фреймворка.
type Message = Body;

/// Наш пользовательский тип актора
struct Actor(Receiver<Message>);

/// Handle (дескриптор) к актору, который можно
/// использовать для планирования сообщений.
type Handle = Sender<Message>;

impl Actor {
    /// Создать новый экземпляр `Actor`
    fn new(capacity: usize) -> (Self, Handle) {
        let (sender, receiver) = channel::bounded(capacity);
        (Self(receiver), sender)
    }

    /// Начать прослушивание входящих сообщений
    /// и обрабатывать их конкурентно
    async fn run(&mut self) -> Result<()> {
        // Используем `ConcurrentStream` из `futures-concurrency`
        // чтобы брать работу из канала и выполнять ее
        // конкурентно. Чтобы ограничить количество конкурентности,
        // вы можете использовать предоставленный комбинатор `.limit`.
        self.0.co().try_for_each(|msg| {
            // Работа планируется здесь
            background_work(body).await?;
            Ok(())
        }).await?;
        Ok(())
    }
}
}

Теперь, чтобы использовать это, нам нужно создать экземпляр, запустить его конкурентно с нашим HTTP-сервером и убедиться, что обработчики запросов на сервере имеют к нему доступ. Я больше всего знаком с HTTP-фреймворком Tide, поэтому я буду использовать его здесь, но ваш любимый фреймворк, вероятно, позволяет сделать что-то подобное. Вот способ связать все это вместе:

use futures_concurrency::prelude::*;

#[async_main]
async fn main() -> server::Result<()> {
    // Создать экземпляр нашего актора
    let (mut actor, handle) = Actor::new(8); // емкость 8
    
    // Создать экземпляр нашего сервера
    // и передать ему handle актора
    let mut app = Server::with_context(handle);
    app.at("/submit").post(handler);

    // Запустить и актор, и сервер
    // и выполнить их конкурентно
    let a = app.listen("127.0.0.1:8080");
    let b = actor.run();
    (a, b).try_join().await?;
    
    Ok(())
}

И теперь, когда наш сервер имеет handler, мы можем изменить нашу функцию-обработчик запроса, чтобы планировать работу на акторе, а не в висячей задаче:

#![allow(unused)]
fn main() {
async fn handler(req: Request, handle: Handle) -> Result<Response> {
    let body = req.body().try_to_string()?;
    handle.send(body).await?; // ← Отправить работу актору
    Ok(Response::Ok())
}
}

Хотя это все еще полагается на выполнение асинхронной операции, которая может потенциально завершиться неудачей, это все же значимо. Вместо того чтобы ждать завершения работы, теперь мы ждем только постановки работы в очередь. И если в системе не происходит чего-то ужасно неправильного, это должно происходить практически мгновенно. А если по какой-то причине это не так: именно поэтому мы всегда добавляем таймауты к нашим вычислениям, верно? …верно?

Чем акторы отличаются от глобальных переменных?

Этот раздел был добавлен 2025-06-02, после публикации поста.

Мне несколько раз задавали вопрос, в чем разница между паттерном актора, показанным в этом посте, и глобальным пулом задач, который управляет порожденными задачами. В конце концов: если взять программу как корень, оба варианта в конечном итоге выглядят как дерево.

Разница между ними заключается в том, что при структурированной конкурентности мы рассматриваем как корень не программу, а функцию main. Паттерн актора в этом посте превращает недостижимый глобальный пул задач в достижимый пул задач с локальностью, которой можно управлять как угодно. Сай Бранд (Sy Brand) особенно хорошо это выразил; слегка перефразируя:

[…] это «о, это структурировано, пока вы смотрите на это таким образом» пахнет как «ну, моя программа не течет память, так как она вся будет возвращена, когда она завершится».

Смысл структурированной конкурентности в том, что всем всегда можно управлять и все достижимо за счет того, что все вычисления существуют в одном дереве. Это делает возможным всегда обрабатывать ошибки, перезапускать экземпляры и изящно завершать работу.

Заключение

В этом посте мы ответили на один из самых распространенных вопросов, которые возникают у людей при первом знакомстве со структурированной конкурентностью: «Как мне планировать фоновую работу без доверия на висячие задачи?»

Самый простой ответ на этот вопрос — акторы: типы, которые предоставляют некоторый (разделяемый) handle, который можно использовать для получения работы. Мы показали, как определить актора примерно в 15 строках кода, как интегрировать его с HTTP-сервером и, наконец, использовать его для замены существующей фоновой задачи.

Я думаю о структурированной конкурентности так же, как о безопасности памяти: чем больше мы делаем ее по умолчанию, тем меньше проблем с конкурентностью у людей в целом. И это кажется важным, потому что параллельные, асинхронные и конкурентные вычисления склоняются чтобы быть одними из самых сложных для осмысления и отладки.

Приложение А: Шаблон паттерна актора

#![allow(unused)]
fn main() {
use async_channel::{self as channel, Sender, Receiver};
use futures_concurrency::prelude::*;

type Message = ();
type Handle = Sender<Message>;

struct Actor(Receiver<Message>);
impl Actor {
    fn new(capacity: usize) -> (Self, Handle) {
        let (sender, receiver) = channel::bounded(capacity);
        (Self(receiver), sender)
    }

    async fn run(&mut self) -> Result<()> {
        self.0.co().try_for_each(|msg| {
            todo!("запланировать работу здесь")
        }).await
    }
}
}