Проблема с аллокацией Waker

— 2025-05-23 Yoshua Wuyts источник

  • введение
  • эффективное конкурентное выполнение требует промежуточных Waker'ов
  • промежуточные Waker'ы требуют аллокаций
  • заключение

Введение

Весь смысл фьючерсов и системы async/.await в Rust — это введение двух новых возможностей: произвольного конкурентного выполнения вычислений и произвольной отмены вычислений. Разница между блокирующими и неблокирующими вычислениями не важна, если мы затем не используем её для планирования работы параллельно.

Но есть проблема — чтобы планировать работу параллельно, фьючерсы имеют два плохих варианта организации своей внутренней работы:

  1. Без аллокаций у фьючерсов поведение пробуждения дочерних фьючерсов имеет сложность O(N²) (квадратичную). Это означает, что если мы запланируем 1_000 фьючерсов, которые все пробуждаются независимо и последовательно, мы получим приблизительно 1_000_000 общих пробуждений1.
  2. С аллокациями фьючерсы могут иметь поведение пробуждения дочерних фьючерсов со сложностью O(N). Это означает, что если мы запланируем 1_000 фьючерсов, которые все завершаются независимо и последовательно, у нас будет 1_000 общих пробуждений.

В Rust мы хорошо знаем, что скрытые аллокации — это плохо. Но мы также знаем, что квадратичный рост при потенциально неограниченном количестве элементов — это тоже плохо. И плохо то, что мы оказались в ситуации, где должны выбирать между двумя плохими вариантами.

Эффективное конкурентное выполнение требует промежуточных Waker'ов

Допустим, мы пишем вручную функцию future::join, которая принимает массив impl Future и возвращает массив Future::Output. Мы можем смоделировать внутреннее состояние, используя массив фьючерсов и массив их выходных данных:

#![allow(unused)]
fn main() {
/// Фьючерс, который конкурентно ожидает список фьючерсов
struct Join<Fut: Future, const N: usize> {
    /// Фьючерсы, которые мы ожидаем.
    futures: [Fut; N],
    /// Выходные данные фьючерсов.
    outputs: [Option<Fut::Output>; N],
}
}

Чтобы конкурентно ожидать все фьючерсы, мы должны реализовать трейт Future для Join. Его выходной тип — [Future::Output; N], и каждый вызов poll должен перебирать каждый ожидающий внутренний фьючерс. Вот наивный пример реализации, в основном просто для контекста:

#![allow(unused)]
fn main() {
impl<Fut: Future, const N: usize> Future for Join<Fut, N> {
    type Output = [Fut::Output; N];

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Настройка начального состояния
        let this = unsafe { self.get_unchecked_mut() };
        let mut all_done = true;

        // Перебираем каждый фьючерс, пока не получим все
        // значения или пока у нас остаются незавершённые фьючерсы.
        for i in 0..this.futures.len() {
            // Либо фьючерс уже завершён...
            if this.outputs[i].is_some() {
                continue;
            }

            // ...либо нам всё ещё нужно его опросить.
            let fut = unsafe { Pin::new_unchecked(&mut this.futures[i]) };
            match fut.poll(cx) {
                Poll::Ready(output) => this.outputs[i] = Some(output),
                Poll::Pending => all_done = false,
            }
        }

        // Цикл завершён, пора посмотреть на наши
        // результаты. Либо у нас всё ещё есть незавершённые фьючерсы...
        if !all_done {
            return Poll::Pending;
        }

        // ...либо всё завершено, и мы готовы вернуть результат.
        let outputs = std::mem::replace(&mut this.outputs, [None; N])
            .map(Option::unwrap);

        Poll::Ready(outputs)
    }
}
}

Если посмотреть на суть этого цикла, можно заметить, что мы перебираем каждый фьючерс на каждой итерации — независимо от того, вызывал ли он свой соответствующий waker или нет. Единственные фьючерсы, которые мы не вызываем, — те, чей вывод мы уже получили. Практически это означает, что мы приближаемся к выполнению O(N²): мы пробуждаем каждый фьючерс при каждом вызове poll. И мы делаем это, потому что не знаем, какие фьючерсы готовы, а какие нет.

Решение здесь — создать «промежуточный» или «встроенный» waker. Вместо того чтобы передавать waker вызывающего кода во все дочерние фьючерсы, фьючерс Join должен создать свой собственный waker(ы), который может отслеживать, какие дочерние фьючерсы готовы, — и опрашивать только их. Самый простой способ сделать это — хранить массив waker'ов и массив «готовых» индексов. Всякий раз, когда дочерний фьючерс вызывает wake, он добавляет свой индекс в список готовых индексов, а затем вызывает родительский waker. Затем, когда опрашивается фьючерс Join, всё, что ему нужно сделать, — это перебрать все готовые индексы и опросить соответствующие фьючерсы. Для краткости я не буду показывать полную реализацию, только само определение:

#![allow(unused)]
fn main() {
/// Фьючерс, который конкурентно ожидает список фьючерсов
struct Join<Fut: Future, const N: usize> {
    /// Фьючерсы, которые мы ожидаем.
    futures: [Fut; N],
    /// Выходные данные фьючерсов.
    outputs: [Option<Fut::Output>; N],
    /// Хранит по одному waker'у для каждого дочернего фьючерса
    wakers: [ChildWaker; N],
    /// Отслеживает, какие дочерние фьючерсы должны быть пробуждены
    indexes: Arc<[AtomicBool; N]>,
}

/// Waker, связанный с конкретным дочерним фьючерсом
struct ChildWaker {
    /// Индекс дочернего фьючерса в списке индексов
    index: usize,
    /// Ссылка на родительский waker, переданный в структуру
    parent_waker: Waker,
    /// Отслеживает, какие дочерние фьючерсы должны быть пробуждены.
    /// Предназначено для индексирования с помощью `index` внутри `Wake`.
    indexes: Arc<[AtomicBool; N]>,
}
impl Wake for InlineWaker { ... }
}

Существуют более эффективные схемы, чем эта, хотя она одна из простейших. И что критически важно: это позволяет нам пробуждать только те фьючерсы, которые нуждаются в пробуждении. Это важно, поскольку пробуждение фьючерсов потенциально может быть дорогой операцией. И хотя мы можем поощрять создание фьючерсов, которые дёшево пробуждать спонтанно, — нам действительно не стоит вызывать лишние пробуждения с самого начала. Потому что лучше, если это бремя ляжет на экспертов по async, пишущих примитивы, чем на каждого отдельного человека, реализующего трейт Future2.

Промежуточные Waker'ы требуют аллокаций

Теперь, когда мы рассмотрели, почему фьючерсы хотят использовать промежуточные waker'ы, давайте внимательнее посмотрим на их внутреннее устройство. Трейт Future опирается на тип Waker, который можно рассматривать как указатель Arc<Fn>. Где Arc можно рассматривать как улучшенный тип Box<T>, который можно свободно клонировать и который предоставляет общий (неизменяемый) доступ к своим внутренностям. Вот как можно определить новый Waker, который будет разблокировать текущий поток при вызове:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::task::{Context, Wake};
use std::thread::{self, Thread};
use std::pin::pin;

/// Waker, который пробуждает текущий поток при вызове.
struct ThreadWaker(Thread);

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        self.0.unpark();
    }
}
}

А вот как создать экземпляр этого же waker'а. Это опирается на реализацию From<Arc<Wake + Send + Sync + 'static>> для Waker:

#![allow(unused)]
fn main() {
/// Выполняет фьючерс до завершения в текущем потоке.
fn block_on<T>(fut: impl Future<Output = T>) -> T {
    let mut fut = pin!(fut);

    // Создаём экземпляр `ThreadWaker`
    let t = thread::current();
    let waker = Arc::new(ThreadWaker(t)).into(); // ← Это аллокация

    // ...
}

Вот именно, создание экземпляра с помощью Arc::new означает, что мы выделяем память. В теории нам не обязательно использовать Arc и для этого. Тип Waker требует только, чтобы он оборачивал RawWakerVTable способом, который является Send + Sync + Clone. Это значит, что теоретически мы могли бы где-то иметь статическую переменную с внутренней изменяемостью, и, возможно, некоторые (встроенные) системы полагаются на это для своих глобальных рантаймов. Но на практике для встроенных конкурентных операций, если мы собираемся создавать Waker, он должен будет быть основан на Arc.

Заключение

Тот факт, что это практически требует аллокаций, — не единственная проблема: в однопоточных системах Waker всё равно требует ограничений + Send + Sync. Даже если бы мы в итоге добавили тип LocalWaker, исходный тип Waker всё равно должен присутствовать. Это означает, что мы не можем практически писать переносимые асинхронные библиотеки !Send, потому что это ограничение никогда не появится в системе типов3.

Причина, по которой у нас есть Arc<Fn> с самого начала, в том, что дизайн фьючерсов предполагает, что мы можем захотеть обернуть произвольные вычисления в waker'ы. Но я не уверен, насколько это верно. Например, в библиотеке futures-concurrency — наши waker'ы выполняют только операцию, эквивалентную переключению атомарного булевого значения (src). Если бы мы вручную писали свою собственную реализацию напрямую, например, против API epoll, нам не пришлось бы платить такую цену. То, что мы рассмотрели в этом посте, — чисто артефакт дизайна трейта Future.

Необходимость выбирать между алгоритмической сложностью и аллокациями — также причина, по которой я ещё не предложил перенести API futures-concurrency в стандартную библиотеку. В стандартной библиотеке Rust есть принцип дизайна, запрещающий скрытые аллокации. Но на работе мы также знаем, что нам нужно защищаться от неправильного использования API. Это означает, что мы, скорее всего, столкнёмся со случаями, когда использование API stdlib на самом деле не рекомендуется. И сложно оправдать перенос в основную ветку чего-то, от использования чего мы, возможно, сразу захотим предостеречь.

Это также не единственная проблема с дизайном трейта Future. Я подробно говорил о проблемах, связанных с API Pin. И в этом посте я также затронул проблему !Send Waker'а. На мой взгляд, у трейта Future есть семь таких проблем, которые я постараюсь описать в будущих постах.

Примечания


  1. Это не просто гипотетическая ситуация — это проблема, с которой мы столкнулись в Microsoft. Мы не можем разумно ожидать, что каждый человек, реализующий трейт Future, будет экспертом по async. И поэтому нам нужны примитивы, которые не просто перекладывают ответственность за производительность на отдельных пользователей.

  2. Если быть точным — вероятно, около половины, поскольку фьючерсы, которые уже завершились, больше не опрашиваются.

  3. Надеюсь, мне не нужно объяснять, что наличие Send, выраженного в системе типов, — это хорошая вещь.