Макрос select
#![allow(unused)] fn main() { macro_rules! select { { $( biased; )? $( $bind:pat = $fut:expr $(, if $cond:expr)? => $handler:expr, )* $( else => $els:expr $(,)? )? } => { ... }; } }
Доступно только с флагом функции macros.
Ожидает завершения нескольких конкурентных ветвей, возвращая результат, когда первая ветвь завершается, отменяя оставшиеся ветви.
Описание
Макрос select! должен использоваться внутри асинхронных функций, замыканий и блоков.
Макрос select! принимает одну или несколько ветвей со следующим шаблоном:
<шаблон> = <асинхронное выражение> (, if <предусловие>)? => <обработчик>,
Дополнительно макрос select! может включать единственную необязательную ветвь else, которая выполняется, если ни одна из других ветвей не соответствует своим шаблонам:
else => <выражение>
Макрос агрегирует все выражения <асинхронное выражение> и выполняет их конкурентно в текущей задаче. Как только первое выражение завершается со значением, соответствующим его <шаблону>, макрос select! возвращает результат вычисления выражения <обработчик> завершенной ветви.
Кроме того, каждая ветвь может включать необязательное предусловие if. Если предусловие возвращает false, то ветвь отключается. Предоставленное <асинхронное выражение> все равно вычисляется, но результирующий future никогда не опрашивается. Эта возможность полезна при использовании select! в цикле.
Полный жизненный цикл выражения select! выглядит следующим образом:
- Вычислить все предоставленные выражения
<предусловие>. Если предусловие возвращаетfalse, отключить ветвь до конца текущего вызоваselect!. Повторный вход вselect!из-за цикла сбрасывает состояние "отключено". - Агрегировать
<асинхронное выражение>из каждой ветви, включая отключенные. Если ветвь отключена,<асинхронное выражение>все равно вычисляется, но результирующий future не опрашивается. - Если все ветви отключены: перейти к шагу 6.
- Конкурентно ожидать результаты всех оставшихся
<асинхронное выражение>. - Как только
<асинхронное выражение>возвращает значение, попытаться применить значение к предоставленному<шаблону>. Если шаблон совпадает, вычислить<обработчик>и вернуть результат. Если шаблон не совпадает, отключить текущую ветвь до конца текущего вызоваselect!. Продолжить с шага 3. - Вычислить выражение
else. Если выражениеelseне предоставлено, вызвать панику.
Характеристики времени выполнения
Поскольку все асинхронные выражения выполняются в текущей задаче, выражения могут выполняться конкурентно, но не параллельно. Это означает, что все выражения выполняются в одном потоке, и если одна ветвь блокирует поток, все другие выражения не смогут продолжить выполнение. Если требуется параллелизм, порождайте каждое асинхронное выражение с помощью tokio::spawn и передавайте дескриптор соединения в select!.
Честность (Fairness)
По умолчанию select! случайным образом выбирает ветвь для проверки первой. Это обеспечивает некоторый уровень честности при вызове select! в цикле с ветвями, которые всегда готовы.
Это поведение можно переопределить, добавив biased; в начало использования макроса. Это заставит select опрашивать future в порядке их следования сверху вниз. Есть несколько причин, по которым это может понадобиться:
- Генерация случайных чисел в
tokio::select!имеет ненулевую стоимость CPU - Ваши future могут взаимодействовать таким образом, что известный порядок опроса имеет значение
Но у этого режима есть важное предостережение. Вы сами отвечаете за обеспечение справедливого порядка опроса ваших future. Если, например, вы выбираете между потоком и future завершения, и поток имеет огромный объем сообщений и нулевое или почти нулевое время между ними, вам следует поместить future завершения раньше в списке select!, чтобы гарантировать, что он всегда будет опрашиваться и не будет проигнорирован из-за того, что поток постоянно готов.
Паника
Макрос select! вызывает панику, если все ветви отключены и нет предоставленной ветви else. Ветвь отключается, когда предоставленное предусловие if возвращает false или когда шаблон не соответствует результату <асинхронное выражение>.
Безопасность отмены (Cancellation Safety)
При использовании select! в цикле для получения сообщений из нескольких источников следует убедиться, что вызов получения безопасен при отмене, чтобы избежать потери сообщений. В этом разделе рассматриваются различные распространенные методы и описывается, являются ли они безопасными при отмене. Списки в этом разделе не являются исчерпывающими.
Следующие методы безопасны при отмене:
tokio::sync::mpsc::Receiver::recvtokio::sync::mpsc::UnboundedReceiver::recvtokio::sync::broadcast::Receiver::recvtokio::sync::watch::Receiver::changedtokio::net::TcpListener::accepttokio::net::UnixListener::accepttokio::signal::unix::Signal::recvtokio::io::AsyncReadExt::readна любомAsyncReadtokio::io::AsyncReadExt::read_bufна любомAsyncReadtokio::io::AsyncWriteExt::writeна любомAsyncWritetokio::io::AsyncWriteExt::write_bufна любомAsyncWritetokio_stream::StreamExt::nextна любомStreamfutures::stream::StreamExt::nextна любомStream
Следующие методы не безопасны при отмене и могут привести к потере данных:
tokio::io::AsyncReadExt::read_exacttokio::io::AsyncReadExt::read_to_endtokio::io::AsyncReadExt::read_to_stringtokio::io::AsyncWriteExt::write_all
Следующие методы не безопасны при отмене, потому что они используют очередь для честности, и отмена заставляет вас потерять свое место в очереди:
tokio::sync::Mutex::locktokio::sync::RwLock::readtokio::sync::RwLock::writetokio::sync::Semaphore::acquiretokio::sync::Notify::notified
Чтобы определить, являются ли ваши собственные методы безопасными при отмене, ищите места использования .await. Это связано с тем, что когда асинхронный метод отменяется, это всегда происходит в .await. Если ваша функция ведет себя корректно, даже если она перезапускается во время ожидания в .await, то она безопасна при отмене.
Безопасность отмены можно определить следующим образом: если у вас есть future, который еще не завершился, то удаление этого future и его повторное создание должно быть no-op операцией. Это определение мотивировано ситуацией, когда select! используется в цикле. Без этой гарантии вы потеряете свой прогресс, когда другая ветвь завершится и вы перезапустите select!, пройдя по циклу.
Имейте в виду, что отмена чего-либо, что не является безопасным при отмене, не обязательно является ошибкой. Например, если вы отменяете задачу, потому что приложение завершает работу, то вам, вероятно, все равно, что частично прочитанные данные будут потеряны.
Примеры
Базовый select с двумя ветвями
#![allow(unused)] fn main() { async fn do_stuff_async() { // асинхронная работа } async fn more_async_work() { // больше асинхронной работы } tokio::select! { _ = do_stuff_async() => { println!("do_stuff_async() завершился первым") } _ = more_async_work() => { println!("more_async_work() завершился первым") } }; }
Базовый выбор между потоками
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; let mut stream1 = stream::iter(vec![1, 2, 3]); let mut stream2 = stream::iter(vec![4, 5, 6]); let next = tokio::select! { v = stream1.next() => v.unwrap(), v = stream2.next() => v.unwrap(), }; assert!(next == 1 || next == 4); }
Сбор содержимого двух потоков
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; let mut stream1 = stream::iter(vec![1, 2, 3]); let mut stream2 = stream::iter(vec![4, 5, 6]); let mut values = vec![]; loop { tokio::select! { Some(v) = stream1.next() => values.push(v), Some(v) = stream2.next() => values.push(v), else => break, } } values.sort(); assert_eq!(&[1, 2, 3, 4, 5, 6], &values[..]); }
Использование с таймаутом
#![allow(unused)] fn main() { use tokio_stream::{self as stream, StreamExt}; use tokio::time::{self, Duration}; let mut stream = stream::iter(vec![1, 2, 3]); let sleep = time::sleep(Duration::from_secs(1)); tokio::pin!(sleep); loop { tokio::select! { maybe_v = stream.next() => { if let Some(v) = maybe_v { println!("получено = {}", v); } else { break; } } _ = &mut sleep => { println!("таймаут"); break; } } } }
Объединение двух значений с использованием select!
#![allow(unused)] fn main() { use tokio::sync::oneshot; let (tx1, mut rx1) = oneshot::channel(); let (tx2, mut rx2) = oneshot::channel(); tokio::spawn(async move { tx1.send("first").unwrap(); }); tokio::spawn(async move { tx2.send("second").unwrap(); }); let mut a = None; let mut b = None; while a.is_none() || b.is_none() { tokio::select! { v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()), v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()), } } let res = (a.unwrap(), b.unwrap()); assert_eq!(res.0, "first"); assert_eq!(res.1, "second"); }
Использование режима biased; для контроля порядка опроса
#![allow(unused)] fn main() { let mut count = 0u8; loop { tokio::select! { // Если запустить этот пример без `biased;`, порядок опроса // псевдослучайный, и проверки значения count (вероятно) провалятся. biased; _ = async {}, if count < 1 => { count += 1; assert_eq!(count, 1); } _ = async {}, if count < 2 => { count += 1; assert_eq!(count, 2); } _ = async {}, if count < 3 => { count += 1; assert_eq!(count, 3); } _ = async {}, if count < 4 => { count += 1; assert_eq!(count, 4); } else => { break; } }; } }
Избегание гонок в предусловиях if
Поскольку предусловия if используются для отключения ветвей select!, следует проявлять осторожность, чтобы избежать пропуска значений.
Например, вот неправильное использование sleep с if. Цель - повторно выполнять асинхронную задачу до 50 миллисекунд. Однако существует вероятность пропуска завершения сна.
ⓘ
#![allow(unused)] fn main() { use tokio::time::{self, Duration}; async fn some_async_work() { // выполняем работу } let sleep = time::sleep(Duration::from_millis(50)); tokio::pin!(sleep); while !sleep.is_elapsed() { tokio::select! { _ = &mut sleep, if !sleep.is_elapsed() => { println!("операция превысила время ожидания"); } _ = some_async_work() => { println!("операция завершена"); } } } }
В приведенном выше примере sleep.is_elapsed() может возвращать true, даже если sleep.poll() никогда не возвращал Ready. Это создает потенциальное состояние гонки, когда sleep истекает между проверкой while !sleep.is_elapsed() и вызовом select!, в результате чего вызов some_async_work() выполняется беспрепятственно, несмотря на истечение времени сна.
Один из способов написать приведенный выше пример без гонки:
#![allow(unused)] fn main() { use tokio::time::{self, Duration}; async fn some_async_work() { // выполняем работу } let sleep = time::sleep(Duration::from_millis(50)); tokio::pin!(sleep); loop { tokio::select! { _ = &mut sleep => { println!("операция превысила время ожидания"); break; } _ = some_async_work() => { println!("операция завершена"); } } } }
Альтернативы из экосистемы
Макрос select! - это мощный инструмент для управления несколькими асинхронными ветвями, позволяющий задачам выполняться конкурентно в одном потоке. Однако его использование может создавать сложности, особенно связанные с безопасностью отмены, что может привести к трудноуловимым и сложным для отладки ошибкам. Для многих случаев использования альтернативы из экосистемы могут быть предпочтительнее, поскольку они смягчают эти проблемы, предлагая более четкий синтаксис, более предсказуемое управление потоком и уменьшая необходимость ручного решения таких проблем, как семантика предохранителей или безопасность отмены.
Объединение потоков (Merging Streams)
Для случаев, когда loop { select! { ... } } используется для опроса нескольких задач, объединение потоков предлагает краткую альтернативу, которая по своей природе обрабатывает безопасную при отмене обработку, устраняя риск потери данных. Библиотеки, такие как tokio_stream, futures::stream и futures_concurrency, предоставляют инструменты для объединения потоков и последовательной обработки их выходных данных.
Гонка Future (Racing Futures)
Если вам нужно дождаться первого завершения среди нескольких асинхронных задач, утилиты экосистемы, такие как futures, futures-lite или futures-concurrency, предоставляют упрощенный синтаксис для гонки future:
futures_concurrency::future::Racefutures::selectfutures::stream::select_all(для потоков)futures_lite::future::orfutures_lite::future::race
#![allow(unused)] fn main() { use futures_concurrency::future::Race; let task_a = async { Ok("ok") }; let task_b = async { Err("error") }; let result = (task_a, task_b).race().await; match result { Ok(output) => println!("Первая задача завершилась с: {output}"), Err(err) => eprintln!("Произошла ошибка: {err}"), } }