Композиция фьючерсов конкурентно

В этой главе мы рассмотрим дополнительные способы компоновки фьючерсов. В частности, новые способы, позволяющие выполнять фьючерсы конкурентно (но не параллельно). Поверхностно, новые функции/макросы, которые мы представим в этой главе, довольно просты. Однако лежащие в их основе концепции могут быть довольно тонкими. Мы начнем с повторения фьючерсов, конкурентности и параллелизма, но вам также может быть полезно перечитать более ранний раздел, сравнивающий конкурентность с параллелизмом.

Фьючерс — это отложенное вычисление. Фьючерс можно продвинуть с помощью await, который передает управление среде выполнения, заставляя текущую задачу ждать результата вычисления. Если a и b — фьючерсы, то их можно скомпоновать последовательно (то есть объединить в фьючерс, который выполняет a до завершения, а затем b до завершения), awaitя сначала один, потом другой: async { a.await; b.await}.

Мы также видели параллельную композицию фьючерсов с помощью spawn: async { let a = spawn(a); let b = spawn(b); (a.await, b.await)} запускает два фьючерса параллельно. Обратите внимание, что await в кортеже ожидают не сами фьючерсы, а JoinHandle, чтобы получить результаты фьючерсов, когда они завершатся.

В этой главе мы представляем два способа компоновки фьючерсов конкурентно без параллелизма: join и select/race. В обоих случаях фьючерсы выполняются конкурентно за счет разделения времени; каждый из составленных фьючерсов по очереди выполняется, затем наступает очередь следующего. Это делается без привлечения асинхронной среды выполнения (и, следовательно, без нескольких потоков ОС и без какого-либо потенциала для параллелизма). Конструкция композиции локально чередует выполнение фьючерсов. Вы можете думать об этих конструкциях как о мини-исполнителях, которые выполняют свои составные фьючерсы в рамках одной асинхронной задачи.

Фундаментальное различие между join и select/race заключается в том, как они обрабатывают завершение работы фьючерсов: join завершается, когда все фьючерсы завершаются, а select/race завершается, когда один фьючерс завершается (все остальные отменяются). Также существуют вариации обоих для обработки ошибок.

Эти конструкции (или аналогичные концепции) часто используются с потоками (streams), мы кратко коснемся этого ниже, но подробнее поговорим об этом в главе о потоках.

Если вы хотите параллелизма (или вы не explicitly не хотите его), порождение задач часто является более простой альтернативой этим конструкциям композиции. Порождение задач обычно менее подвержено ошибкам, более универсально, и производительность более предсказуема. С другой стороны, порождение по своей природе менее структурировано, что может затруднить рассуждения о жизненном цикле и управлении ресурсами.

Стоит рассмотреть проблему производительности немного глубже. Потенциальная проблема производительности при конкурентной композиции — это справедливость распределения времени. Если у вас есть 100 задач в вашей программе, то обычно оптимальный способ распределения ресурсов — предоставить каждой задаче 1% процессорного времени (или, если все задачи ожидают, то дать каждой одинаковый шанс быть разбуженной). Если вы порождаете 100 задач, то обычно это примерно так и происходит. Однако если вы породите две задачи и соедините (join) 99 фьючерсов на одной из этих задач, то планировщик будет знать только о двух задачах, и одна задача получит 50% времени, а 99 фьючерсов получат по 0.5% каждый.

Обычно распределение задач не настолько смещено, и очень часто мы используем join/select/и т.д. для таких вещей, как таймауты, где такое поведение фактически желательно. Но стоит учитывать это, чтобы убедиться, что ваша программа имеет желаемые характеристики производительности.

Join (Соединение)

Макрос join в Tokio принимает список фьючерсов и запускает их все до завершения конкурентно (возвращая все результаты в виде кортежа). Он возвращается, когда все фьючерсы завершены. Фьючерсы всегда выполняются в одном и том же потоке (конкурентно, а не параллельно).

Вот простой пример:

async fn main() {
  let (result_1, result_2) = join!(do_a_thing(), do_a_thing());
  // Используйте `result_1` и `result_2`.
}

Здесь два выполнения do_a_thing происходят конкурентно, и результаты готовы, когда оба завершены. Обратите внимание, что мы не используем await для получения результатов. join! неявно ожидает свои фьючерсы и производит значение. Он не создает фьючерс. Вам все еще нужно использовать его внутри асинхронного контекста (например, внутри асинхронной функции).

Хотя вы не видите этого в примере выше, join! принимает выражения, которые вычисляются в фьючерсы1. join не создает асинхронный контекст в своем теле, и вам не следует awaitить фьючерсы, передаваемые в join (иначе они будут вычислены до того, как присоединенные фьючерсы начнут выполняться).

Поскольку все фьючерсы выполняются в одном потоке, если какой-либо фьючерс блокирует поток, то ни один из них не может прогрессировать. При использовании мьютекса или другой блокировки это может легко привести к взаимоблокировке (deadlock), если один фьючерс ждет блокировку, удерживаемую другим фьючерсом.

join не заботится о результате фьючерсов. В частности, если фьючерс отменен или возвращает ошибку, это не влияет на другие — они продолжают выполняться. Если вы хотите поведения «быстроого отказа» (fail fast), используйте try_join. try_join работает аналогично join, однако, если любой фьючерс возвращает Err, то все остальные фьючерсы отменяются, и try_join немедленно возвращает ошибку.

Еще в предыдущей главе об async/await мы использовали слово «join» для описания соединения порожденных задач. Как следует из названия, соединение фьючерсов и задач связано: соединение означает, что мы выполняем несколько фьючерсов конкурентно и ждем результат, прежде чем продолжить. Синтаксис отличается: использование JoinHandle против макроса join, но идея схожа. Ключевое различие заключается в том, что при соединении задач задачи выполняются конкурентно и параллельно, тогда как при использовании join! фьючерсы выполняются конкурентно, но не параллельно. Более того, порожденные задачи планируются планировщиком среды выполнения, тогда как с join! фьючерсы «планируются» локально (в той же задаче и в пределах временной области выполнения макроса). Другое отличие состоит в том, что если порожденная задача паникует, паника перехватывается средой выполнения, но если фьючерс в join паникует, то паникует вся задача.

Альтернативы

Конкурентный запуск фьючерсов и сбор их результатов — это распространенное требование. Вам, вероятно, следует использовать spawn и JoinHandle, если у вас нет веской причины не делать этого (т.е. вы явно не хотите параллелизма, и даже тогда вы можете предпочесть spawn_local). Абстракция JoinSet управляет такими порожденными задачами способом, аналогичным join!.

Большинство сред выполнения (и futures.rs) имеют эквивалент макроса join от Tokio, и они в основном ведут себя одинаково. Также существуют функции join, которые похожи на макрос, но немного менее гибки. Например, futures.rs имеет join для соединения двух фьючерсов, join3, join4 и join5 для соединения очевидного количества фьючерсов, и join_all для соединения коллекции фьючерсов (а также try_ вариации каждого из них).

Futures-concurrency также предоставляет функциональность для join (и try_join). В стиле futures-concurrency эти операции являются методами трейта для групп фьючерсов, таких как кортежи, Vec или массивы. Например, чтобы соединить два фьючерса, вы бы написали (fut1, fut2).join().await (обратите внимание, что await здесь явный).

Если набор фьючерсов, которые вы хотите соединить, изменяется динамически (например, новые фьючерсы создаются по мере поступления входных данных по сети), или вы хотите получать результаты по мере их завершения, а не когда все фьючерсы завершатся, то вам нужно будет использовать потоки и функциональность FuturesUnordered или FuturesOrdered. Мы рассмотрим их в главе о потоках.

Race/Select (Гонка/Выбор)

Аналогом соединения фьючерсов является их гонка (также known как выбор). При гонке/выборе фьючерсы выполняются конкурентно, но вместо того, чтобы ждать завершения всех фьючерсов, мы ждем только завершения первого, а затем отменяем остальные. Хотя это звучит похоже на соединение, это значительно интереснее (и иногда чревато ошибками), потому что теперь нам приходится рассуждать об отмене.

Вот пример использования макроса select от Tokio:

async fn main() {
  select! {
    result = do_a_thing() => {
      println!("computation completed and returned {result}");
    }
    _ = timeout() => {
      println!("computation timed-out");
    }
  }
}

Вы заметите, что все уже интереснее, чем с макросом join, потому что мы обрабатываем результаты фьючерсов внутри макроса select. Это немного похоже на выражение match, но с select все ветви выполняются конкурентно, и тело ветви, которая завершается первой, выполняется с ее результатом (другие ветви не выполняются, и фьючерсы отменяются путем dropа). В примере do_a_thing и timeout выполняются конкурентно, и первый завершившийся будет иметь свой блок выполненным (т.е. только один println запустится), другой фьючерс будет отменен. Как и в макросе join, ожидание фьючерсов неявное.

Макрос select от Tokio поддерживает кучу функций:

  • Сопоставление с образцом (pattern matching): синтаксис слева от = в каждой ветви может быть образцом, и блок выполняется только если результат фьючерса соответствует образцу. Если образец не совпадает, то фьючерс больше не опрашивается (но другие фьючерсы — да). Это может быть полезно для фьючерсов, которые опционально возвращают значение, например, Some(x) = do_a_thing() => { ... }.
  • Охранники if: каждая ветвь может иметь охранник if. Когда макрос select запускается, после вычисления каждого выражения для получения фьючерса, вычисляется охранник if, и фьючерс опрашивается только если охранитель истинен. Например, x = do_a_thing() if false => { ... } никогда не будет опрошен. Обратите внимание, что охранник if не перевычисляется во время опроса, только при инициализации макроса.
  • Ветвь else: select может иметь ветвь else else => { ... }, она выполняется, если все фьючерсы остановились и ни один из блоков не был выполнен. Если это происходит без ветви else, то select запаникует.

Значение макроса select! — это значение выполненной ветви (как и в match), поэтому все ветви должны иметь одинаковый тип. Например, если бы мы захотели использовать результат приведенного выше примера вне select, мы бы написали это так:

async fn main() {
  let result = select! {
    result = do_a_thing() => {
      Some(result)
    }
    _ = timeout() => {
      None
    }
  };

  // Используйте `result`
}

Как и в случае с join!, select! не обрабатывает Result каким-либо особым образом (кроме упомянутого ранее сопоставления с образцом), и если ветвь завершается с ошибкой, то все другие ветви отменяются, и ошибка используется как результат select (так же, как если бы ветвь завершилась успешно).

Макрос select по своей сути использует отмену, поэтому если вы пытаетесь избежать отмены в вашей программе, вы должны избегать select!. Фактически, select часто является основным источником отмены в асинхронной программе. Как обсуждалось в другом месте, отмена имеет множество тонких проблем, которые могут привести к ошибкам. В частности, обратите внимание, что select отменяет фьючерсы, просто удаляя их. Это не уведомит удаляемый фьючерс и не активирует какие-либо токены отмены и т.д.

select! часто используется в цикле для обработки потоков или других последовательностей фьючерсов. Это добавляет дополнительный уровень сложности и возможностей для ошибок. В простом случае, когда мы создаем новый, независимый фьючерс на каждой итерации цикла, все не намного сложнее. Однако это редко то, что нужно. Обычно мы хотим сохранить некоторое состояние между итерациями. Распространено использование select в цикле с потоками, где каждая итерация цикла обрабатывает один результат из потока. Например:

async fn main() {
  let mut stream = ...;

  loop {
    select! {
      result = stream.next() => {
        match result {
          Some(x) => println!("received: {x}"),
          None => break,
        }
      }
      _ = timeout() => {
        println!("time out!");
        break;
      }
    }
  }
}

В этом примере мы читаем значения из stream и печатаем их, пока они не закончатся или ожидание результата не превысит таймаут. Что произойдет с любыми оставшимися данными в потоке в случае таймаута, зависит от реализации потока (они могут быть потеряны! Или продублированы!). Это пример того, почему поведение при отмене может быть важным (и сложным).

Мы можем захотеть повторно использовать фьючерс, а не только поток, across итераций. Например, мы можем захотеть соревноваться с фьючерсом таймаута, где таймаут применяется ко всем итерациям, а не устанавливает новый таймаут для каждой итерации. Это возможно путем создания фьючерса вне цикла и ссылки на него:

async fn main() {
  let mut stream = ...;
  let mut timeout = timeout();

  loop {
    select! {
      result = stream.next() => {
        match result {
          Some(x) => println!("received: {x}"),
          None => break,
        }
      }
      // Создаем ссылку на `timeout`, а не перемещаем его.
      _ = &mut timeout => {
        println!("time out!");
        break;
      }
    }
  }
}

Есть несколько важных деталей при использовании select! в цикле с фьючерсами или потоками, созданными вне select!. Они являются фундаментальным следствием того, как работает select, поэтому я представлю их, подробно разобрав select, используя timeout в последнем примере в качестве примера.

  • timeout создается вне цикла и инициализируется некоторым временем для обратного отсчета.
  • На каждой итерации цикла select создает ссылку на timeout, но не меняет его состояние.
  • По мере выполнения select опрашивает timeout, который будет возвращать Pending, пока время не истекло, и Ready, когда время истекает, после чего его блок выполняется.

В приведенном выше примере, когда timeout готов, мы выходим из цикла с помощью break. Но что, если бы мы этого не сделали? В этом случае select просто снова опросил бы timeout, что, согласно документации Future, не должно происходить! select не может этого предотвратить, у него нет никакого состояния (между итерациями), чтобы решить, следует ли опрашивать timeout. В зависимости от того, как написан timeout, это может вызвать панику, логическую ошибку или какой-то сбой.

Вы можете предотвратить такого рода ошибки несколькими способами:

  • Используйте fused фьючерс или поток, чтобы повторный опрос был безопасным.
  • Убедитесь, что ваш код структурирован так, что фьючерсы никогда не переопрашиваются, например, выходя из цикла (как в предыдущем примере) или используя охранник if.

Теперь рассмотрим тип &mut timeout. Предположим, что timeout() возвращает тип, который реализует Future, который может быть анонимным типом из асинхронной функции или именованным типом, таким как Timeout. Предположим последнее, потому что это делает примеры проще (но логика применима в любом случае). Учитывая, что Timeout реализует Future, будет ли &mut Timeout реализовывать Future? Не обязательно! Существует blanket impl, который делает это истинным, но только если Timeout реализует Unpin. Это верно не для всех фьючерсов, поэтому часто вы получите ошибку типа, написав код, подобный последнему примеру. Такая ошибка легко исправляется с помощью макроса pin, например, let mut timeout = pin!(timeout());

Отмена с помощью select в цикле — это богатый источник тонких ошибок. Обычно они происходят, когда фьючерс содержит некоторое состояние, включающее некоторые данные, но не сами данные. Когда фьючерс удаляется из-за отмены, это состояние теряется, но лежащие в основе данные не обновляются. Это может привести к потере данных или их многократной обработке.

Альтернативы

Futures.rs имеет свой собственный макрос select, а futures-concurrency имеет трейт Race, которые являются альтернативами макросу select от Tokio. Они оба имеют ту же основную семантику конкурентной гонки нескольких фьючерсов, обработки результата первого и отмены остальных, но они имеют разный синтаксис и различаются в деталях.

select от futures.rs поверхностно похож на Tokio; чтобы обобщить различия, в версии futures.rs:

  • Фьючерсы всегда должны быть fused (обеспечивается проверкой типов).
  • select имеет ветви default и complete, а не ветвь else.
  • select не поддерживает охранники if.

Race от futures-concurrency имеет очень отличающийся синтаксис, похожий на его версию join, например, (future_a, future_b).race().await (он работает с Vec и массивами, а также с кортежами). Синтаксис менее гибкий, чем у макросов, но хорошо вписывается в большинство асинхронного кода. Обратите внимание, что если вы используете race внутри цикла, у вас все равно могут быть те же проблемы, что и с select.

Как и в случае с join, порождение задач и позволение им выполняться параллельно часто является хорошей альтернативой использованию select. Однако отмена оставшихся задач после завершения первой требует некоторой дополнительной работы. Это можно сделать с помощью каналов или токена отмены. В любом случае, отмена требует некоторого действия со стороны отменяемой задачи, что означает, что задача может выполнить некоторую очистку или другое graceful завершение.

Распространенное использование select (особенно внутри цикла) — работа с потоками. Существуют методы-комбинаторы потоков, которые могут заменить некоторые использования select. Например, merge в futures-concurrency является хорошей альтернативой для объединения нескольких потоков вместе.

Заключительные слова

В этом разделе мы говорили о двух способах запуска групп фьючерсов конкурентно. Соединение фьючерсов означает ожидание завершения их всех; выбор (также known как гонка) фьючерсов означает ожидание завершения первого. В отличие от порождения задач, эти композиции не используют параллелизм.

И join, и select работают с наборами фьючерсов, которые известны заранее (часто при написании программы, а не во время выполнения). Иногда фьючерсы, которые нужно скомпоновать, не известны заранее — фьючерсы должны добавляться в набор компонуемых фьючерсов по мере их выполнения. Для этого нам нужны потоки, которые имеют свои собственные операции композиции.

Стоит повторить, что хотя эти операторы композиции мощны и выразительны, часто проще и уместнее использовать задачи и порождение: параллелизм часто желателен, у вас меньше вероятность ошибок, связанных с отменой или блокировкой, и распределение ресурсов обычно более справедливо (или, по крайней мере, проще) и более предсказуемо.


  1. Выражения должны иметь тип, который реализует IntoFuture. Выражение вычисляется и преобразуется в фьючерс макросом. Т.е. им не обязательно вычисляться в фьючерс, а во что-то, что может быть преобразовано в фьючерс, но это довольно незначительное различие. Сами выражения вычисляются последовательно до выполнения любых результирующих фьючерсов.