Каналы

Теперь, когда мы немного узнали о параллелизме с Tokio, давайте применим это на стороне клиента. Поместим код сервера, который мы написали ранее, в отдельный бинарный файл:

$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs

и создадим новый бинарный файл, который будет содержать код клиента:

$ touch src/bin/client.rs

В этом файле вы будете писать код этой страницы. Когда вы захотите запустить его, вам нужно будет сначала запустить сервер в отдельном окне терминала:

$ cargo run --bin server

А затем клиент отдельно:

$ cargo run --bin client

Что ж, давайте программировать!

Допустим, мы хотим выполнить две параллельные команды Redis. Мы можем создать по одной задаче для каждой команды. Тогда две команды будут выполняться параллельно.

Сначала мы можем попробовать что-то вроде:

use mini_redis::client;

#[tokio::main]
async fn main() {
    // Устанавливаем соединение с сервером
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Создаем две задачи: одна получает ключ, другая устанавливает ключ
    let t1 = tokio::spawn(async {
        let res = client.get("foo").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

Это не компилируется, потому что обе задачи должны как-то обращаться к client. Поскольку Client не реализует Copy, код не скомпилируется без некоторого кода для облегчения этого совместного использования. Кроме того, Client::set принимает &mut self, что означает, что требуется эксклюзивный доступ для его вызова. Мы могли бы открыть соединение для каждой задачи, но это не идеально. Мы не можем использовать std::sync::Mutex, так как .await нужно вызывать с удержанной блокировкой. Мы могли бы использовать tokio::sync::Mutex, но это позволило бы выполнять только один запрос одновременно. Если клиент реализует конвейеризацию (кратко: отправка многих команд без ожидания ответа на каждую предыдущую команду), асинхронный мьютекс приводит к неполному использованию соединения.

Передача сообщений

Ответ заключается в использовании передачи сообщений. Шаблон включает создание выделенной задачи для управления ресурсом client. Любая задача, которая хочет выполнить запрос, отправляет сообщение задаче client. Задача client выполняет запрос от имени отправителя, и ответ отправляется обратно отправителю.

Используя эту стратегию, устанавливается единственное соединение. Задача, управляющая client, может получить эксклюзивный доступ для вызова get и set. Кроме того, канал работает как буфер. Операции могут быть отправлены задаче client, пока она занята. Как только задача client становится доступной для обработки новых запросов, она извлекает следующий запрос из канала. Это может привести к лучшей пропускной способности и быть расширено для поддержки пула соединений.

Примитивы каналов Tokio

Tokio предоставляет несколько каналов, каждый из которых служит своей цели.

  • mpsc: многопоточный отправитель, однопоточный получатель. Можно отправить много значений.
  • oneshot: однопоточный отправитель, однопоточный получатель. Можно отправить одно значение.
  • broadcast: многопоточный отправитель, многопоточный получатель. Можно отправить много значений. Каждый получатель видит каждое значение.
  • watch: многопоточный отправитель, многопоточный получатель. Можно отправить много значений, но история не сохраняется. Получатели видят только последнее значение.

Если вам нужен многопоточный отправитель/многопоточный получатель канала, где только один получатель видит каждое сообщение, вы можете использовать крейт async-channel. Также существуют каналы для использования вне асинхронного Rust, такие как std::sync::mpsc и crossbeam::channel. Эти каналы ожидают сообщения, блокируя поток, что не допускается в асинхронном коде.

В этом разделе мы будем использовать mpsc и oneshot. Другие типы каналов передачи сообщений исследуются в последующих разделах. Полный код из этого раздела находится здесь.

Определение типа сообщения

В большинстве случаев, при использовании передачи сообщений, задача, получающая сообщения, отвечает более чем на одну команду. В нашем случае задача будет отвечать на команды GET и SET. Чтобы смоделировать это, мы сначала определим перечисление Command и включим вариант для каждого типа команды.

#![allow(unused)]
fn main() {
use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}
}

Создание канала

В функции main создается канал mpsc.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Создаем новый канал с емкостью не более 32.
    let (tx, mut rx) = mpsc::channel(32);
tx.send(()).await.unwrap();

    // ... Дальнейший код здесь
}

Канал mpsc используется для отправки команд задаче, управляющей соединением redis. Возможность многопоточности позволяет отправлять сообщения из многих задач. Создание канала возвращает два значения: отправитель и получатель. Два handle используются отдельно. Они могут быть перемещены в разные задачи.

Канал создается с емкостью 32. Если сообщения отправляются быстрее, чем принимаются, канал будет их хранить. После того как в канале сохранено 32 сообщения, вызов send(...).await перейдет в режим ожидания, пока сообщение не будет удалено получателем.

Отправка из нескольких задач осуществляется путем клонирования Sender. Например:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

Оба сообщения отправляются в единственный handle Receiver. Невозможно клонировать получателя канала mpsc.

Когда каждый Sender вышел из области видимости или иным образом был удален, больше невозможно отправлять сообщения в канал. В этот момент вызов recv на Receiver вернет None, что означает, что все отправители исчезли и канал закрыт.

В нашем случае задачи, управляющей соединением Redis, она знает, что может закрыть соединение Redis, когда канал закрыт, так как соединение больше не будет использоваться.

Создание задачи-менеджера

Далее создадим задачу, которая обрабатывает сообщения из канала. Сначала устанавливается клиентское соединение с Redis. Затем полученные команды выполняются через соединение Redis.

#![allow(unused)]
fn main() {
use mini_redis::client;
enum Command {
   Get { key: String },
   Set { key: String, val: bytes::Bytes }
}
async fn dox() {
let (_, mut rx) = tokio::sync::mpsc::channel(10);
// Ключевое слово `move` используется для **перемещения** владения `rx` в задачу.
let manager = tokio::spawn(async move {
    // Устанавливаем соединение с сервером
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Начинаем получать сообщения
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});
}
}

Теперь обновим две задачи для отправки команд через канал вместо их непосредственного выполнения на соединении Redis.

#![allow(unused)]
fn main() {
#[derive(Debug)]
enum Command {
   Get { key: String },
   Set { key: String, val: bytes::Bytes }
}
async fn dox() {
let (mut tx, _) = tokio::sync::mpsc::channel(10);
// Handle `Sender` перемещаются в задачи. Поскольку есть две задачи, нам нужен второй `Sender`.
let tx2 = tx.clone();

// Создаем две задачи: одна получает ключ, другая устанавливает ключ
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "foo".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});
}
}

Внизу функции main мы используем .await для join handles, чтобы гарантировать полное завершение команд перед выходом из процесса.

#![allow(unused)]
fn main() {
type Jh = tokio::task::JoinHandle<()>;
async fn dox(t1: Jh, t2: Jh, manager: Jh) {
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
}

Получение ответов

Последний шаг — получение ответа от задачи-менеджера. Команде GET нужно получить значение, а команде SET нужно знать, успешно ли завершилась операция.

Для передачи ответа используется канал oneshot. Канал oneshot — это канал с одним отправителем и одним получателем, оптимизированный для отправки одного значения. В нашем случае единственное значение — это ответ.

Аналогично mpsc, oneshot::channel() возвращает handle отправителя и получателя.

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

async fn dox() {
let (tx, rx) = oneshot::channel();
tx.send(()).unwrap();
}
}

В отличие от mpsc, емкость не указывается, так как она всегда равна единице. Кроме того, ни один из handle не может быть клонирован.

Для получения ответов от задачи-менеджера, перед отправкой команды создается канал oneshot. Половина Sender канала включается в команду для задачи-менеджера. Половина получения используется для получения ответа.

Сначала обновим Command, чтобы включить Sender. Для удобства используется псевдоним типа для ссылки на Sender.

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;
use bytes::Bytes;

/// Несколько различных команд мультиплексируются через один канал.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// Предоставляется запрашивающей стороной и используется задачей-менеджером для отправки ответа на команду обратно запрашивающей стороне.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
}

Теперь обновим задачи, выполняющие команды, чтобы включить oneshot::Sender.

#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use bytes::Bytes;
#[derive(Debug)]
enum Command {
    Get { key: String, resp: Responder<Option<bytes::Bytes>> },
    Set { key: String, val: Bytes, resp: Responder<()> },
}
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
fn dox() {
let (mut tx, mut rx) = mpsc::channel(10);
let mut tx2 = tx.clone();
let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "foo".to_string(),
        resp: resp_tx,
    };

    // Отправляем запрос GET
    tx.send(cmd).await.unwrap();

    // Ожидаем ответ
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // Отправляем запрос SET
    tx2.send(cmd).await.unwrap();

    // Ожидаем ответ
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});
}
}

Наконец, обновим задачу-менеджер для отправки ответа через канал oneshot.

#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use bytes::Bytes;
#[derive(Debug)]
enum Command {
    Get { key: String, resp: Responder<Option<bytes::Bytes>> },
    Set { key: String, val: Bytes, resp: Responder<()> },
}
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
async fn dox(mut client: mini_redis::client::Client) {
let (_, mut rx) = mpsc::channel::<Command>(10);
while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // Игнорируем ошибки
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // Игнорируем ошибки
            let _ = resp.send(res);
        }
    }
}
}
}

Вызов send на oneshot::Sender завершается немедленно и не требует .await. Это связано с тем, что send на канале oneshot всегда завершается неудачей или успехом немедленно без какого-либо ожидания.

Отправка значения на канал oneshot возвращает Err, когда половина получателя была удалена. Это указывает, что получатель больше не заинтересован в ответе. В нашем сценарии отмена интереса получателем является допустимым событием. Ошибка Err, возвращаемая resp.send(...), не требует обработки.

Вы можете найти полный код здесь.

Обратное давление и ограниченные каналы

Всякий раз, когда вводится параллелизм или очереди, важно обеспечить, чтобы очереди были ограничены и система могла корректно обрабатывать нагрузку. Неограниченные очереди в конечном итоге заполнят всю доступную память и вызовут сбой системы непредсказуемым образом.

Tokio заботится о том, чтобы избежать неявного создания очередей. Большая часть этого заключается в том, что асинхронные операции ленивы. Рассмотрим следующее:

fn async_op() {}
fn dox() {
loop {
    async_op();
}
}
fn main() {}

Если асинхронная операция выполняется активно, цикл будет повторно ставить в очередь новую async_op для выполнения без обеспечения завершения предыдущей операции. Это приводит к неявному неограниченному созданию очередей. Системы на основе обратных вызовов и системы на основе активных future особенно подвержены этому.

Однако с Tokio и асинхронным Rust приведенный фрагмент не приведет к выполнению async_op вообще. Это потому, что .await никогда не вызывается. Если фрагмент обновлен для использования .await, то цикл ждет завершения операции перед началом нового цикла.

async fn async_op() {}
async fn dox() {
loop {
    // Не повторится, пока `async_op` не завершится
    async_op().await;
}
}
fn main() {}

Параллелизм и очереди должны вводиться явно. Способы сделать это включают:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

При этом позаботьтесь о том, чтобы общий объем параллелизма был ограничен. Например, при написании цикла принятия TCP убедитесь, что общее количество открытых сокетов ограничено. При использовании mpsc::channel выберите управляемую емкость канала. Конкретные значения границ будут зависеть от приложения.

Забота и выбор хороших границ — большая часть написания надежных приложений Tokio.