Select

До сих пор, когда мы хотели добавить параллелизм в систему, мы создавали новую задачу. Теперь мы рассмотрим некоторые дополнительные способы параллельного выполнения асинхронного кода с Tokio.

tokio::select!

Макрос tokio::select! позволяет ожидать выполнения нескольких асинхронных вычислений и возвращает результат, когда завершается одно из вычислений.

Например:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Используются два oneshot-канала. Любой из каналов может завершиться первым. Оператор select! ожидает оба канала и связывает val со значением, возвращенным задачей. Когда либо tx1, либо tx2 завершаются, выполняется соответствующий блок.

Ветка, которая не завершается, отбрасывается. В примере вычисление ожидает oneshot::Receiver для каждого канала. oneshot::Receiver для канала, который еще не завершился, отбрасывается.

Отмена

В асинхронном Rust отмена выполняется путем отбрасывания future. Вспомним из "Асинхронность в глубине", что асинхронные операции Rust реализуются с использованием future, и future ленивы. Операция продолжается только тогда, когда future опрашивается. Если future отбрасывается, операция не может продолжаться, потому что все связанное состояние отбрасывается.

Тем не менее, иногда асинхронная операция будет создавать фоновые задачи или запускать другие операции, которые выполняются в фоне. Например, в приведенном выше примере создается задача для отправки сообщения обратно. Обычно задача выполняет некоторые вычисления для генерации значения.

Future или другие типы могут реализовывать Drop для очистки фоновых ресурсов. oneshot::Receiver Tokio реализует Drop, отправляя уведомление о закрытии половине Sender. Половина отправителя может получить это уведомление и прервать выполняющуюся операцию, отбросив ее.

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Вычисляем значение здесь
"wut".to_string()
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Выбираем между операцией и уведомлением
        // `closed()` oneshot-канала.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation()` отменена, задача
                // завершается и `tx1` отбрасывается.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Реализация Future

Чтобы лучше понять, как работает select!, давайте посмотрим, как могла бы выглядеть гипотетическая реализация Future. Это упрощенная версия. На практике select! включает дополнительную функциональность, такую как случайный выбор ветки для опроса первой.

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // используем tx1 и tx2
tx1.send("one").unwrap();
tx2.send("two").unwrap();

    MySelect {
        rx1,
        rx2,
    }.await;
}

Future MySelect содержит future из каждой ветки. Когда MySelect опрашивается, опрашивается первая ветка. Если она готова, значение используется и MySelect завершается. После того как .await получает вывод из future, future отбрасывается. Это приводит к отбрасыванию future для обеих веток. Поскольку одна ветка не завершилась, операция эффективно отменяется.

Помните из предыдущего раздела:

Когда future возвращает Poll::Pending, он должен обеспечить, чтобы waker был сигнализирован в какой-то момент в будущем. Забывание этого приводит к зависанию задачи на неопределенное время.

В реализации MySelect нет явного использования аргумента Context. Вместо этого требование waker выполняется путем передачи cx внутренним future. Поскольку внутренний future также должен удовлетворять требованию waker, возвращая Poll::Pending только при получении Poll::Pending от внутреннего future, MySelect также удовлетворяет требованию waker.

Синтаксис

Макрос select! может обрабатывать более двух веток. Текущий предел - 64 ветки. Каждая ветка структурирована как:

<pattern> = <async expression> => <handler>,

Когда макрос select вычисляется, все <async expression> агрегируются и выполняются параллельно. Когда выражение завершается, результат сопоставляется с <pattern>. Если результат соответствует шаблону, то все оставшиеся асинхронные выражения отбрасываются и выполняется <handler>. Выражение <handler> имеет доступ к любым привязкам, установленным <pattern>.

Базовый случай для <pattern> - это имя переменной, результат асинхронного выражения привязывается к имени переменной, и <handler> имеет доступ к этой переменной. Вот почему в исходном примере val использовался для <pattern>, и <handler> мог получить доступ к val.

Если <pattern> не соответствует результату асинхронного вычисления, то оставшиеся асинхронные выражения продолжают выполняться параллельно до завершения следующего. В этот момент та же логика применяется к этому результату.

Поскольку select! принимает любое асинхронное выражение, можно определить более сложные вычисления для выбора.

Здесь мы выбираем между выводом oneshot-канала и TCP-соединением.

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Создаем задачу, которая отправляет сообщение через oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

Здесь мы выбираем между oneshot и принятием сокетов из TcpListener.

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Помогаем выводу типов Rust
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}
async fn process(_: tokio::net::TcpStream) {}

Цикл принятия выполняется до тех пор, пока не встретится ошибка или rx не получит значение. Шаблон _ указывает, что мы не заинтересованы в возвращаемом значении асинхронного вычисления.

Возвращаемое значение

Макрос tokio::select! возвращает результат вычисленного выражения <handler>.

async fn computation1() -> String {
    // .. вычисление
unimplemented!();
}

async fn computation2() -> String {
    // .. вычисление
unimplemented!();
}

fn dox() {
#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}
}

Из-за этого требуется, чтобы выражение <handler> для каждой ветки вычислялось в один и тот же тип. Если вывод выражения select! не нужен, хорошей практикой является вычисление выражения в ().

Ошибки

Использование оператора ? распространяет ошибку из выражения. Как это работает, зависит от того, используется ли ? из асинхронного выражения или из обработчика. Использование ? в асинхронном выражении распространяет ошибку из асинхронного выражения. Это делает вывод асинхронного выражения Result. Использование ? из обработчика немедленно распространяет ошибку из выражения select!. Давайте снова посмотрим на пример цикла принятия:

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [настройка `rx` oneshot-канала]
let (tx, rx) = oneshot::channel();
tx.send(()).unwrap();

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Помогаем выводу типов Rust
            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}
async fn process(_: tokio::net::TcpStream) {}

Обратите внимание на listener.accept().await?. Оператор ? распространяет ошибку из этого выражения и в привязку res. При ошибке res будет установлен в Err(_). Затем в обработчике оператор ? используется снова. Оператор res? будет распространять ошибку из функции main.

Сопоставление с образцом

Напомним, что синтаксис ветки макроса select! был определен как:

<pattern> = <async expression> => <handler>,

До сих пор мы использовали только привязки переменных для <pattern>. Однако можно использовать любой шаблон Rust. Например, скажем, мы получаем из нескольких MPSC-каналов, мы можем сделать что-то вроде этого:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Делаем что-то с `tx1` и `tx2`
tx1.send(1).await.unwrap();
tx2.send(2).await.unwrap();
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

В этом примере выражение select! ожидает получения значения из rx1 и rx2. Если канал закрывается, recv() возвращает None. Это не соответствует шаблону, и ветка отключается. Выражение select! будет продолжать ожидать оставшиеся ветки.

Обратите внимание, что это выражение select! включает ветку else. Выражение select! должно вычисляться в значение. При использовании сопоставления с образцом возможно, что ни одна из веток не соответствует своим связанным шаблонам. Если это происходит, вычисляется ветка else.

Заимствование

При создании задач создаваемое асинхронное выражение должно владеть всеми своими данными. Макрос select! не имеет этого ограничения. Каждое асинхронное выражение ветки может заимствовать данные и работать параллельно. Следуя правилам заимствования Rust, несколько асинхронных выражений могут неизменяемо заимствовать один фрагмент данных или одно асинхронное выражение может изменяемо заимствовать фрагмент данных.

Давайте рассмотрим несколько примеров. Здесь мы одновременно отправляем одни и те же данные в два разных TCP-назначения.

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}
fn main() {}

Переменная data заимствуется неизменяемо из обоих асинхронных выражений. Когда одна из операций завершается успешно, другая отбрасывается. Поскольку мы сопоставляем с образцом Ok(_), если выражение завершается неудачно, другое продолжает выполняться.

Когда дело доходит до <handler> каждой ветки, select! гарантирует, что выполняется только один <handler>. Из-за этого каждый <handler> может изменяемо заимствовать одни и те же данные.

Например, это изменяет out в обоих обработчиках:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Отправляем значения на `tx1` и `tx2`.
let _ = tx1.send("one");
let _ = tx2.send("two");
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

Циклы

Макрос select! часто используется в циклах. Этот раздел пройдет через некоторые примеры, чтобы показать общие способы использования макроса select! в цикле. Мы начнем с выбора из нескольких каналов:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);
tx1.clone().send("hello").await.unwrap();
drop((tx1, tx2, tx3));

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}

Этот пример выбирает из трех получателей каналов. Когда сообщение получено на любом канале, оно записывается в STDOUT. Когда канал закрывается, recv() возвращает None. Используя сопоставление с образцом, макрос select! продолжает ожидать оставшиеся каналы. Когда все каналы закрыты, вычисляется ветка else, и цикл завершается.

Макрос select! случайным образом выбирает ветки для проверки готовности первой. Когда несколько каналов имеют ожидающие значения, случайный канал будет выбран для получения. Это нужно для обработки случая, когда цикл приема обрабатывает сообщения медленнее, чем они помещаются в каналы, что означает, что каналы начинают заполняться. Если бы select! не выбирал случайным образом ветку для проверки первой, на каждой итерации цикла rx1 проверялась бы первой. Если rx1 всегда содержал новое сообщение, оставшиеся каналы никогда не проверялись бы.

информация Если при вычислении select! несколько каналов имеют ожидающие сообщения, только из одного канала извлекается значение. Все остальные каналы остаются нетронутыми, и их сообщения остаются в этих каналах до следующей итерации цикла. Ни одно сообщение не теряется.

Возобновление асинхронной операции

Теперь мы покажем, как запускать асинхронную операцию в нескольких вызовах select!. В этом примере у нас есть MPSC-канал с типом элемента i32 и асинхронная функция. Мы хотим запускать асинхронную функцию до тех пор, пока она не завершится или не будет получено четное целое число из канала.

async fn action() {
    // Некоторая асинхронная логика
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
  tokio::spawn(async move {
      let _ = tx.send(1).await;
      let _ = tx.send(2).await;
  });
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

Обратите внимание, что вместо вызова action() в макросе select!, он вызывается вне цикла. Возврат action() присваивается operation без вызова .await. Затем мы вызываем tokio::pin! на operation.

Внутри цикла select! вместо передачи operation мы передаем &mut operation. Переменная operation отслеживает выполняющуюся асинхронную операцию. Каждая итерация цикла использует ту же операцию вместо вызова нового вызова action().

Другая ветка select! получает сообщение из канала. Если сообщение четное, мы заканчиваем цикл. В противном случае снова запускаем select!.

Это первый раз, когда мы используем tokio::pin!. Мы не будем вдаваться в детали закрепления (pinning) сейчас. Важно отметить, что для .await ссылки значение, на которое ссылаются, должно быть закреплено или реализовывать Unpin.

Если мы удалим строку tokio::pin! и попытаемся скомпилировать, мы получим следующую ошибку:

error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

Хотя мы рассматривали Future в предыдущей главе, эта ошибка все еще не очень понятна. Если вы столкнулись с такой ошибкой о том, что Future не реализован при попытке вызвать .await на ссылке, то future, вероятно, нужно закрепить.

Подробнее о Pin в стандартной библиотеке.

Изменение ветки

Давайте рассмотрим немного более сложный цикл. У нас есть:

  1. Канал значений i32.
  2. Асинхронная операция для выполнения над значениями i32.

Логика, которую мы хотим реализовать:

  1. Ждать четное число в канале.
  2. Запустить асинхронную операцию, используя четное число в качестве ввода.
  3. Ждать операцию, но в то же время слушать больше четных чисел в канале.
  4. Если новое четное число получено до завершения существующей операции, прервать существующую операцию и начать ее заново с новым четным числом.
async fn action(input: Option<i32>) -> Option<String> {
    // Если ввод `None`, вернуть `None`.
    // Это также можно записать как `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // асинхронная логика здесь
  Some(i.to_string())
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
    
    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);
    
    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });
    
    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` - метод на `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

Мы используем стратегию, аналогичную предыдущему примеру. Асинхронная функция вызывается вне цикла и присваивается operation. Переменная operation закрепляется. Цикл выбирает между operation и получателем канала.

Обратите внимание, как action принимает Option<i32> в качестве аргумента. Прежде чем мы получим первое четное число, нам нужно создать экземпляр operation во что-то. Мы заставляем action принимать Option и возвращать Option. Если передается None, возвращается None. Первая итерация цикла, operation немедленно завершается с None.

Этот пример использует некоторый новый синтаксис. Первая ветка включает , if !done. Это предусловие ветки. Прежде чем объяснять, как это работает, давайте посмотрим, что произойдет, если предусловие опущено. Если опустить , if !done и запустить пример, получится следующий вывод:

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Эта ошибка происходит при попытке использовать operation после того, как она уже завершилась. Обычно при использовании .await значение, которое ожидается, потребляется. В этом примере мы ожидаем ссылку. Это означает, что operation все еще существует после ее завершения.

Чтобы избежать этой паники, мы должны позаботиться о том, чтобы отключить первую ветку, если operation завершилась. Переменная done используется для отслеживания, завершилась ли operation. Ветка select! может включать предусловие. Это предусловие проверяется до того, как select! ожидает ветку. Если условие вычисляется в false, то ветка отключается. Переменная done инициализируется в false. Когда operation завершается, done устанавливается в true. Следующая итерация цикла отключит ветку operation. Когда из канала получено четное сообщение, operation сбрасывается, и done устанавливается в false.

Параллелизм на уровне задачи

И tokio::spawn, и select! позволяют запускать параллельные асинхронные операции. Однако стратегия, используемая для запуска параллельных операций, отличается. Функция tokio::spawn принимает асинхронную операцию и создает новую задачу для ее выполнения. Задача - это объект, который планируется средой выполнения Tokio. Две разные задачи планируются Tokio независимо. Они могут выполняться одновременно в разных потоках операционной системы. Из-за этого созданная задача имеет те же ограничения, что и созданный поток: без заимствования.

Макрос select! запускает все ветки параллельно в одной и той же задаче. Поскольку все ветки макроса select! выполняются в одной и той же задаче, они никогда не будут выполняться одновременно. Макрос select! мультиплексирует асинхронные операции в одной задаче.