Учебник Tokio
Учебное руководство
Tokio — это асинхронная среда выполнения (runtime) для языка программирования Rust. Она предоставляет строительные блоки, необходимые для написания сетевых приложений. Tokio дает гибкость для работы с широким спектром систем — от больших серверов с десятками ядер до небольших встраиваемых устройств.
На высоком уровне Tokio предоставляет несколько основных компонентов:
- Многопоточную среду выполнения для исполнения асинхронного кода
- Асинхронную версию стандартной библиотеки
- Большую экосистему библиотек
Роль Tokio в вашем проекте
Когда вы пишете приложение в асинхронном стиле, вы позволяете ему лучше масштабироваться за счет снижения стоимости одновременного выполнения множества операций. Однако асинхронный код Rust не работает сам по себе, поэтому вам необходимо выбрать среду выполнения. Библиотека Tokio является наиболее широко используемой средой выполнения, превосходящей по использованию все остальные среды вместе взятые.
Кроме того, Tokio предоставляет множество полезных утилит. При написании асинхронного кода вы не можете использовать обычные блокирующие API, предоставляемые стандартной библиотекой Rust, и вместо этого должны использовать их асинхронные версии. Эти альтернативные версии предоставляются Tokio и повторяют API стандартной библиотеки Rust там, где это имеет смысл.
Преимущества Tokio
В этом разделе описаны некоторые преимущества Tokio.
Быстродействие Tokio — быстрый, построенный на основе языка программирования Rust, который сам по себе быстрый. Это сделано в духе Rust с целью, чтобы вы не могли улучшить производительность, написав эквивалентный код вручную.
Tokio — масштабируемый, построенный на основе функциональности языка async/await, которая сама по себе масштабируема. При работе с сетевыми соединениями существует предел скорости обработки одного соединения из-за задержек, поэтому единственный способ масштабирования — одновременная обработка множества соединений. С функцией языка async/await увеличение количества concurrent-операций становится incredibly дешевым, позволяя масштабироваться до большого количества одновременных задач.
Надежность Tokio построен с использованием Rust — языка, который позволяет каждому создавать надежное и эффективное программное обеспечение. Ряд исследований показал, что примерно ~70% критических уязвимостей безопасности являются результатом небезопасной работы с памятью. Использование Rust устраняет весь этот класс ошибок в ваших приложениях.
Tokio также уделяет большое внимание обеспечению последовательного поведения без сюрпризов. Основная цель Tokio — позволить пользователям развертывать предсказуемое программное обеспечение, которое будет работать одинаково изо дня в день с надежным временем отклика и без непредсказуемых скачков задержки.
Простота С функцией async/await в Rust сложность написания асинхронных приложений существенно снизилась. В сочетании с утилитами Tokio и vibrant экосистемой написание приложений становится легким.
Tokio следует соглашениям об именовании стандартной библиотеки, где это имеет смысл. Это позволяет легко преобразовывать код, написанный только со стандартной библиотекой, в код, написанный с Tokio. С сильной системой типов Rust возможность легко создавать корректный код не имеет аналогов.
Гибкость Tokio предоставляет несколько вариантов среды выполнения — от многопоточной среды с work-stealing до легковесной однопоточной среды. Каждая из этих сред выполнения поставляется с множеством настроек, позволяющих пользователям адаптировать их к своим потребностям.
Когда не следует использовать Tokio
Хотя Tokio полезен для многих проектов, которым необходимо выполнять множество операций одновременно, есть также случаи, когда Tokio не подходит.
- Ускорение вычислений, ограниченных производительностью CPU, путем их параллельного выполнения в нескольких потоках. Tokio разработан для приложений, ограниченных производительностью I/O, где каждая отдельная задача тратит большую часть времени на ожидание I/O. Если ваше приложение только выполняет параллельные вычисления, вам следует использовать rayon. Тем не менее, все еще возможно "смешивать и сочетать", если вам нужно делать и то, и другое.
- Чтение большого количества файлов. Хотя может показаться, что Tokio будет полезен для проектов, которым просто нужно читать много файлов, Tokio не дает здесь преимуществ по сравнению с обычным пулом потоков. Это связано с тем, что операционные системы обычно не предоставляют асинхронные API для работы с файлами.
- Отправка одного веб-запроса. Tokio дает преимущество, когда вам нужно делать много вещей одновременно. Если вам нужно использовать библиотеку, предназначенную для асинхронного Rust, такую как reqwest, но вам не нужно делать много операций одновременно, вам следует предпочесть блокирующую версию этой библиотеки, так как это упростит ваш проект. Использование Tokio, конечно, все равно будет работать, но не даст реальных преимуществ по сравнению с блокирующим API.
Получение помощи
В любой момент, если вы застряли, вы всегда можете получить помощь в Discord или обсуждениях на GitHub. Не беспокойтесь о задавании "начальных" вопросов. Мы все с чего-то начинали и рады помочь.
Определения
Асинхронность
В контексте Rust асинхронный код относится к коду, который использует функцию языка async/await, позволяющую многим задачам работать конкурентно на нескольких потоках (или даже одном потоке).
Конкурентность и параллелизм
Конкурентность и параллелизм — это два связанных понятия, которые используются при обсуждении выполнения нескольких задач одновременно. Если что-то происходит параллельно, то это также происходит конкурентно, но обратное неверно: переключение между двумя задачами без фактической одновременной работы над ними — это конкурентность, но не параллелизм.
Future (Футура/Будущее)
Future — это значение, которое хранит текущее состояние некоторой операции. Future также имеет метод poll, который позволяет операции продолжаться до тех пор, пока ей не потребуется ожидать чего-либо, например, сетевого соединения. Вызовы метода poll должны возвращаться очень быстро.
Future часто создаются путем комбинирования нескольких future с использованием .await в async-блоке.
Исполнитель/Планировщик
Исполнитель или планировщик — это нечто, что выполняет future, многократно вызывая метод poll. В стандартной библиотеке нет исполнителя, поэтому для этого нужна внешняя библиотека, и наиболее широко используемый исполнитель предоставляется средой выполнения Tokio.
Исполнитель способен запускать большое количество future конкурентно на нескольких потоках. Он делает это, переключая текущую задачу в точках await. Если код проводит много времени без достижения .await, это называется "блокировкой потока" или "не возвращением управления исполнителю", что мешает запуску других задач.
Среда выполнения
Среда выполнения — это библиотека, которая содержит исполнитель вместе с различными утилитами, интегрированными с этим исполнителем, такими как утилиты времени и ввода-вывода. Слова "среда выполнения" и "исполнитель" иногда используются взаимозаменяемо. В стандартной библиотеке нет среды выполнения, поэтому для этого нужна внешняя библиотека, и наиболее широко используемой средой выполнения является среда выполнения Tokio.
Слово "Runtime" также используется в других контекстах, например, фраза "У Rust нет runtime" иногда означает, что Rust не выполняет сборку мусора или JIT-компиляцию.
Задача
Задача — это операция, выполняемая в среде выполнения Tokio, создаваемая функцией tokio::spawn или Runtime::block_on. Инструменты для создания future путем их комбинирования, такие как .await и join!, не создают новых задач, и каждая комбинируемая часть считается "в той же задаче".
Для параллелизма требуется несколько задач, но возможно конкурентно выполнять несколько операций в одной задаче с помощью таких инструментов, как join!.
Порождающее создание
Порождающее создание — это когда функция tokio::spawn используется для создания новой задачи. Это также может относиться к созданию нового потока с помощью std::thread::spawn.
Async-блок
Async-блок — это простой способ создать future, который выполняет некоторый код. Например:
#![allow(unused)] fn main() { let world = async { println!(" world!"); }; let my_future = async { print!("Hello "); world.await; }; }
Код выше создает future с именем my_future, который при выполнении печатает Hello world!. Он делает это, сначала печатая "Hello", а затем выполняя future world. Обратите внимание, что сам по себе этот код ничего не печатает — вам нужно фактически выполнить my_future, либо напрямую создав его как задачу, либо используя .await в чем-то, что вы создаете.
Async-функция
Подобно async-блоку, async-функция — это простой способ создать функцию, тело которой становится future. Все async-функции можно переписать в обычные функции, возвращающие future:
#![allow(unused)] fn main() { async fn do_stuff(i: i32) -> String { // do stuff format!("The integer is {}.", i) } }
#![allow(unused)] fn main() { use std::future::Future; // async-функция выше аналогична этой: fn do_stuff(i: i32) -> impl Future<Output = String> { async move { // do stuff format!("The integer is {}.", i) } } }
Здесь используется синтаксис impl Trait для возврата future, поскольку Future — это трейт. Обратите внимание, что поскольку future, созданный async-блоком, ничего не делает до своего выполнения, вызов async-функции ничего не делает до тех пор, пока не будет выполнен возвращаемый future (игнорирование этого вызывает предупреждение).
Уступка управления
В контексте асинхронного Rust уступка управления — это то, что позволяет исполнителю выполнять множество future в одном потоке. Каждый раз, когда future уступает управление, исполнитель может заменить этот future другим future, и, многократно переключая текущую задачу, исполнитель может конкурентно выполнять большое количество задач. Future может уступить управление только в .await, поэтому future, которые проводят много времени между .await, могут препятствовать выполнению других задач.
Если конкретно, future уступает управление, когда возвращается из метода poll.
Блокировка
Слово "блокировка" используется двумя разными способами: первое значение "блокировки" — это просто ожидание завершения чего-либо, а другое значение блокировки — когда future проводит много времени без уступки управления. Для однозначности можно использовать фразу "блокировка потока" для второго значения.
Документация Tokio всегда будет использовать второе значение "блокировки".
Для выполнения блокирующего кода в Tokio см. раздел Задачи, ограниченные CPU, и блокирующий код в справочнике по API Tokio.
Поток
Stream — это асинхронная версия Iterator, предоставляющая поток значений. Он обычно используется вместе с циклом while let, например:
#![allow(unused)] fn main() { use tokio_stream::StreamExt; // для next() async fn dox() { let mut stream = tokio_stream::empty::<()>(); while let Some(item) = stream.next().await { // что-то делаем } } }
Слово "поток" иногда по ошибке используется для обозначения трейтов AsyncRead и AsyncWrite.
Утилиты для потоков Tokio в настоящее время предоставляются крейтом tokio-stream. Когда трейт Stream стабилизируется в std, утилиты потоков будут перемещены в крейт tokio.
Канал
Канал — это инструмент, который позволяет одной части кода отправлять сообщения другим частям. Tokio предоставляет несколько каналов, каждый из которых служит своей цели.
- mpsc: многопоточный отправитель, однопоточный получатель. Можно отправить много значений.
- oneshot: однопоточный отправитель, однопоточный получатель. Можно отправить одно значение.
- broadcast: многопоточный отправитель, многопоточный получатель. Можно отправить много значений. Каждый получатель видит каждое значение.
- watch: однопоточный отправитель, многопоточный получатель. Можно отправить много значений, но история не сохраняется. Получатели видят только последнее значение.
Если вам нужен многопоточный отправитель/многопоточный получатель канала, где только один получатель видит каждое сообщение, вы можете использовать крейт async-channel.
Также существуют каналы для использования вне асинхронного Rust, такие как std::sync::mpsc и crossbeam::channel. Эти каналы ожидают сообщения, блокируя поток, что не допускается в асинхронном коде.
Обратное давление
Обратное давление — это шаблон проектирования приложений, которые хорошо справляются с высокой нагрузкой. Например, канал mpsc бывает как ограниченным, так и неограниченным. Используя ограниченный канал, получатель может создавать "обратное давление" на отправителя, если получатель не успевает обрабатывать количество сообщений, что позволяет избежать неограниченного роста использования памяти при отправке все большего количества сообщений по каналу.
Актор
Шаблон проектирования для создания приложений. Актор — это независимо созданная задача, которая управляет некоторым ресурсом от имени других частей приложения, используя каналы для связи с этими другими частями приложения.
См. главу о каналах для примера актора.
Установка
Это руководство шаг за шагом проведет вас через процесс создания Redis клиента и сервера. Мы начнем с основ асинхронного программирования на Rust и будем постепенно углубляться. Мы реализуем подмножество команд Redis, но получим комплексный обзор Tokio.
Mini-Redis
Проект, который вы будете создавать в этом руководстве, доступен как Mini-Redis на GitHub. Mini-Redis разработан с основной целью изучения Tokio и поэтому хорошо прокомментирован, но это также означает, что Mini-Redis отсутствуют некоторые функции, которые вы хотели бы видеть в реальной Redis-библиотеке. Вы можете найти готовые к использованию в production Redis-библиотеки на crates.io.
Мы будем напрямую использовать Mini-Redis в этом руководстве. Это позволяет нам использовать части Mini-Redis в руководстве до того, как мы реализуем их позже.
Получение помощи
В любой момент, если вы застряли, вы всегда можете получить помощь в Discord или обсуждениях на GitHub. Не беспокойтесь о задавании "начальных" вопросов. Мы все с чего-то начинали и рады помочь.
Предварительные требования
Читатели должны уже быть знакомы с Rust. Книга по Rust — отличный ресурс для начала.
Хотя это и не обязательно, некоторый опыт написания сетевого кода с использованием стандартной библиотеки Rust или другого языка может быть полезен.
Знание Redis не требуется.
Rust
Прежде чем начать, убедитесь, что у вас установлен и готов к работе набор инструментов Rust. Если у вас его нет, самый простой способ установить — использовать rustup.
Это руководство требует минимальной версии Rust 1.45.0, но рекомендуется использовать последнюю стабильную версию Rust.
Чтобы проверить, установлен ли Rust на вашем компьютере, выполните:
$ rustc --version
Вы должны увидеть вывод примерно такой: rustc 1.46.0 (04488afe3 2020-08-24).
Сервер Mini-Redis
Далее установите сервер Mini-Redis. Он будет использоваться для тестирования нашего клиента по мере его создания.
$ cargo install mini-redis
Убедитесь, что установка прошла успешно, запустив сервер:
$ mini-redis-server
Затем в отдельном окне терминала попробуйте получить ключ foo с помощью mini-redis-cli:
$ mini-redis-cli get foo
Вы должны увидеть (nil).
Все готово
Вот и все, все готово к работе. Перейдите на следующую страницу, чтобы написать свое первое асинхронное приложение на Rust.
hello tokio
Мы начнем с написания очень простого приложения на Tokio. Оно подключится к серверу Mini-Redis, установит значение ключа hello в world, а затем прочитает этот ключ обратно. Это будет сделано с использованием клиентской библиотеки Mini-Redis.
Код
Генерация нового крейта
Давайте начнем с создания нового Rust-приложения:
$ cargo new my-redis
$ cd my-redis
Добавление зависимостей
Затем откройте Cargo.toml и добавьте следующее прямо под [dependencies]:
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"
Написание кода
Затем откройте main.rs и замените содержимое файла на:
use mini_redis::{client, Result}; fn dox() { #[tokio::main] async fn main() -> Result<()> { // Открываем соединение с адресом mini-redis. let mut client = client::connect("127.0.0.1:6379").await?; // Устанавливаем ключ "hello" со значением "world" client.set("hello", "world".into()).await?; // Получаем ключ "hello" let result = client.get("hello").await?; println!("got value from the server; result={:?}", result); Ok(()) } }
Убедитесь, что сервер Mini-Redis запущен. В отдельном окне терминала выполните:
$ mini-redis-server
Если вы еще не установили mini-redis, вы можете сделать это с помощью:
$ cargo install mini-redis
Теперь запустите приложение my-redis:
$ cargo run
got value from the server; result=Some(b"world")
Успех!
Полный код можно найти здесь.
Разбор кода
Давайте подробнее разберем, что мы только что сделали. Кода немного, но происходит много всего.
#![allow(unused)] fn main() { use mini_redis::client; async fn dox() -> mini_redis::Result<()> { let mut client = client::connect("127.0.0.1:6379").await?; Ok(()) } }
Функция client::connect предоставляется крейтом mini-redis. Она асинхронно устанавливает TCP-соединение с указанным удаленным адресом. После установления соединения возвращается handle client. Хотя операция выполняется асинхронно, код, который мы пишем, выглядит синхронным. Единственное указание на то, что операция асинхронная — это оператор .await.
Что такое асинхронное программирование?
Большинство компьютерных программ выполняются в том же порядке, в котором они написаны. Сначала выполняется первая строка, затем следующая и так далее. При синхронном программировании, когда программа встречает операцию, которая не может быть завершена немедленно, она блокируется до завершения операции. Например, установление TCP-соединения требует обмена с узлом по сети, что может занять значительное время. В течение этого времени поток заблокирован.
При асинхронном программировании операции, которые не могут быть завершены немедленно, приостанавливаются и переносятся в фон. Поток не блокируется и может продолжать выполнять другие задачи. Как только операция завершается, задача возобновляется и продолжает обработку с того места, где она остановилась. В нашем предыдущем примере была только одна задача, поэтому ничего не происходит, пока она приостановлена, но асинхронные программы обычно имеют много таких задач.
Хотя асинхронное программирование может привести к более быстрым приложениям, оно часто приводит к гораздо более сложным программам. Программист должен отслеживать все состояние, необходимое для возобновления работы после завершения асинхронной операции. Исторически это была утомительная и подверженная ошибкам задача.
Зеленые потоки во время компиляции
Rust реализует асинхронное программирование с помощью функции под названием async/await. Функции, которые выполняют асинхронные операции, помечаются ключевым словом async. В нашем примере функция connect определена так:
#![allow(unused)] fn main() { use mini_redis::Result; use mini_redis::client::Client; use tokio::net::ToSocketAddrs; pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> { // ... unimplemented!() } }
Определение async fn выглядит как обычная синхронная функция, но работает асинхронно. Rust преобразует async fn во время компиляции в процедуру, которая работает асинхронно. Любые вызовы .await внутри async fn возвращают управление обратно потоку. Поток может выполнять другую работу, пока операция обрабатывается в фоне.
предупреждение Хотя другие языки также реализуют
async/await, Rust использует уникальный подход. В первую очередь, асинхронные операции Rust ленивы. Это приводит к другой семантике выполнения по сравнению с другими языками.
Если это пока не совсем понятно, не волнуйтесь. Мы подробнее исследуем async/await в этом руководстве.
Использование async/await
Асинхронные функции вызываются как любые другие функции Rust. Однако вызов этих функций не приводит к выполнению тела функции. Вместо этого вызов async fn возвращает значение, представляющее операцию. Это концептуально аналогично замыканию без аргументов. Чтобы фактически выполнить операцию, вы должны использовать оператор .await для возвращаемого значения.
Например, данная программа
async fn say_world() { println!("world"); } #[tokio::main] async fn main() { // Вызов `say_world()` не выполняет тело `say_world()`. let op = say_world(); // Этот println! выводится первым println!("hello"); // Вызов `.await` на `op` начинает выполнение `say_world`. op.await; }
выводит:
hello
world
Возвращаемое значение async fn — это анонимный тип, реализующий трейт Future.
Асинхронная функция main
Главная функция, используемая для запуска приложения, отличается от обычной, встречающейся в большинстве крейтов Rust.
- Это
async fn - Она аннотирована с
#[tokio::main]
async fn используется, потому что мы хотим войти в асинхронный контекст. Однако асинхронные функции должны выполняться средой выполнения (runtime). Среда выполнения содержит планировщик асинхронных задач, предоставляет событийный I/O, таймеры и т.д. Среда выполнения не запускается автоматически, поэтому главная функция должна её запустить.
Функция #[tokio::main] — это макрос. Она преобразует async fn main() в синхронную fn main(), которая инициализирует экземпляр среды выполнения и выполняет асинхронную главную функцию.
Например, следующий код:
#[tokio::main] async fn main() { println!("hello"); }
преобразуется в:
fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!("hello"); }) }
Подробности среды выполнения Tokio будут рассмотрены позже.
Cargo features
При подключении Tokio для этого руководства был включен флаг функции full:
tokio = { version = "1", features = ["full"] }
Tokio имеет много функциональности (TCP, UDP, Unix-сокеты, таймеры, утилиты синхронизации, несколько типов планировщиков и т.д.). Не всем приложениям нужна вся функциональность. При попытке оптимизировать время компиляции или размер конечного приложения, приложение может выбрать только те функции, которые оно использует.
Spawning
Мы переключимся и начнем работать над сервером Redis.
Сначала переместим клиентский код SET/GET из предыдущего раздела в примерный файл. Таким образом, мы сможем запускать его против нашего сервера.
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
Затем создайте новый, пустой src/main.rs и продолжайте.
Принятие сокетов
Первое, что должен сделать наш сервер Redis — это принимать входящие TCP-сокеты. Это делается путем привязки tokio::net::TcpListener к порту 6379.
информация Многие типы Tokio называются так же, как их синхронные эквиваленты в стандартной библиотеке Rust. Когда это имеет смысл, Tokio предоставляет те же API, что и
std, но используяasync fn.
Затем сокеты принимаются в цикле. Каждый сокет обрабатывается, затем закрывается. Пока мы будем читать команду, выводить ее в stdout и отвечать ошибкой.
src/main.rs
use tokio::net::{TcpListener, TcpStream}; use mini_redis::{Connection, Frame}; fn dox() { #[tokio::main] async fn main() { // Привязываем слушателя к адресу let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { // Второй элемент содержит IP и порт нового соединения. let (socket, _) = listener.accept().await.unwrap(); process(socket).await; } } } async fn process(socket: TcpStream) { // `Connection` позволяет нам читать/писать **фреймы** redis вместо потоков байтов. // Тип `Connection` определен в mini-redis. let mut connection = Connection::new(socket); if let Some(frame) = connection.read_frame().await.unwrap() { println!("GOT: {:?}", frame); // Отвечаем ошибкой let response = Frame::Error("unimplemented".to_string()); connection.write_frame(&response).await.unwrap(); } }
Теперь запустите этот цикл принятия:
$ cargo run
В отдельном окне терминала запустите пример hello-redis (команда SET/GET из предыдущего раздела, которая играет роль клиента Redis):
$ cargo run --example hello-redis
Вывод должен быть:
Error: "unimplemented"
В терминале сервера вывод:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
Конкурентность
У нашего сервера есть небольшая проблема (кроме того, что он отвечает только ошибками). Он обрабатывает входящие запросы по одному. Когда соединение принято, сервер остается внутри блокировки цикла принятия до тех пор, пока ответ полностью не записан в сокет.
Мы хотим, чтобы наш сервер Redis обрабатывал много конкурентных запросов. Для этого нам нужно добавить немного конкурентности.
информация Конкурентность и параллелизм — не одно и то же. Если вы переключаетесь между двумя задачами, то вы работаете над обеими задачами конкурентно, но не параллельно. Чтобы это считалось параллельным, вам нужно два человека, каждый из которых посвящен своей задаче.
Одно из преимуществ использования Tokio заключается в том, что асинхронный код позволяет вам работать над многими задачами конкурентно, без необходимости работать над ними параллельно с использованием обычных потоков. Фактически, Tokio может запускать многие задачи конкурентно в одном потоке!
Чтобы обрабатывать соединения конкурентно, для каждого входящего соединения создается новая задача. Соединение обрабатывается в этой задаче.
Цикл принятия становится:
use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { let (socket, _) = listener.accept().await.unwrap(); // Для каждого входящего сокета создается новая задача. Сокет перемещается в новую задачу и обрабатывается там. tokio::spawn(async move { process(socket).await; }); } } } async fn process(_: tokio::net::TcpStream) {}
Задачи
Задача Tokio — это асинхронный зеленый поток. Они создаются путем передачи async блока в tokio::spawn. Функция tokio::spawn возвращает JoinHandle, который вызывающая сторона может использовать для взаимодействия с созданной задачей. async блок может иметь возвращаемое значение. Вызывающая сторона может получить возвращаемое значение с помощью .await на JoinHandle.
Например:
#[tokio::main] async fn main() { let handle = tokio::spawn(async { // Выполняем некоторую асинхронную работу "return value" }); // Выполняем другую работу let out = handle.await.unwrap(); println!("GOT {}", out); }
Ожидание JoinHandle возвращает Result. Когда задача встречает ошибку во время выполнения, JoinHandle вернет Err. Это происходит, когда задача либо паникует, либо когда задача принудительно отменяется из-за остановки среды выполнения.
Задачи — это единица выполнения, управляемая планировщиком. Создание задачи отправляет ее в планировщик Tokio, который затем обеспечивает выполнение задачи, когда у нее есть работа. Созданная задача может выполняться в том же потоке, где она была создана, или может выполняться в другом потоке среды выполнения. Задача также может быть перемещена между потоками после создания.
Задачи в Tokio очень легковесны. Под капотом они требуют только одно выделение памяти и 64 байта памяти. Приложения должны свободно создавать тысячи, если не миллионы задач.
Ограничение 'static
Когда вы создаете задачу в среде выполнения Tokio, время жизни ее типа должно быть 'static. Это означает, что созданная задача не должна содержать ссылок на данные, владеемые вне задачи.
информация Распространенное заблуждение, что
'staticвсегда означает "живет вечно", но это не так. Только потому, что значение'static, не означает, что у вас утечка памяти. Вы можете прочитать больше в Common Rust Lifetime Misconceptions.
Например, следующий код не скомпилируется:
use tokio::task; #[tokio::main] async fn main() { let v = vec![1, 2, 3]; task::spawn(async { println!("Here's a vec: {:?}", v); }); }
Попытка компиляции приводит к следующей ошибке:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
Это происходит потому, что по умолчанию переменные не перемещаются в async блоки. Вектор v остается во владении функции main. Строка println! заимствует v. Компилятор Rust любезно объясняет это нам и даже предлагает исправление! Изменение строки 7 на task::spawn(async move { предпишет компилятору переместить v в созданную задачу. Теперь задача владеет всеми своими данными, делая их 'static.
Если к одним данным должен быть доступ из более чем одной задачи конкурентно, то они должны быть общими с использованием примитивов синхронизации, таких как Arc.
Обратите внимание, что сообщение об ошибке говорит о том, что тип аргумента переживает время жизни 'static. Эта терминология может быть довольно запутанной, потому что время жизни 'static длится до конца программы, так что если он переживает его, разве это не утечка памяти? Объяснение в том, что это тип, а не значение должен переживать время жизни 'static, и значение может быть уничтожено до того, как его тип станет недействительным.
Когда мы говорим, что значение 'static, все это означает, что не будет ошибкой хранить это значение вечно. Это важно, потому что компилятор не может рассуждать о том, как долго новая задача остается активной. Мы должны убедиться, что задаче разрешено жить вечно, чтобы Tokio мог заставить задачу работать так долго, как это необходимо.
Статья, на которую ссылается предыдущий информационный блок, использует терминологию "ограничено 'static" вместо "его тип переживает 'static" или "значение 'static" для ссылки на T: 'static. Все это означает одно и то же, но отличается от "аннотировано с 'static" как в &'static T.
Ограничение Send
Задачи, созданные с помощью tokio::spawn, должны реализовывать Send. Это позволяет среде выполнения Tokio перемещать задачи между потоками, пока они приостановлены на .await.
Задачи являются Send, когда все данные, которые хранятся между вызовами .await, являются Send. Это немного тонко. Когда вызывается .await, задача возвращается к планировщику. В следующий раз, когда задача выполняется, она возобновляется с точки, где она последний раз уступила. Чтобы это работало, все состояние, которое используется после .await, должно быть сохранено задачей. Если это состояние Send, т.е. может быть перемещено между потоками, то сама задача может быть перемещена между потоками. И наоборот, если состояние не Send, то и задача тоже.
Например, это работает:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { // Область видимости заставляет `rc` удалиться до `.await`. { let rc = Rc::new("hello"); println!("{}", rc); } // `rc` больше не используется. Он **не** сохраняется, когда задача уступает планировщику yield_now().await; }); }
А это нет:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { let rc = Rc::new("hello"); // `rc` используется после `.await`. Он должен быть сохранен в состоянии задачи. yield_now().await; println!("{}", rc); }); }
Попытка компиляции фрагмента приводит к:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
Мы обсудим особый случай этой ошибки более подробно в следующей главе.
Хранение значений
Теперь мы реализуем функцию process для обработки входящих команд. Мы будем использовать HashMap для хранения значений. Команды SET будут вставлять в HashMap, а команды GET будут загружать их. Дополнительно мы будем использовать цикл для принятия более одной команды на соединение.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; async fn process(socket: TcpStream) { use mini_redis::Command::{self, Get, Set}; use std::collections::HashMap; // Для хранения данных используется хэш-карта let mut db = HashMap::new(); // Connection, предоставляемый `mini-redis`, обрабатывает разбор фреймов из сокета let mut connection = Connection::new(socket); // Используем `read_frame` для получения команды из соединения. while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { // Значение хранится как `Vec<u8>` db.insert(cmd.key().to_string(), cmd.value().to_vec()); Frame::Simple("OK".to_string()) } Get(cmd) => { if let Some(value) = db.get(cmd.key()) { // `Frame::Bulk` ожидает данные типа `Bytes`. Этот тип будет рассмотрен позже в руководстве. Пока `&Vec<u8>` преобразуется в `Bytes` с помощью `into()`. Frame::Bulk(value.clone().into()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // Записываем ответ клиенту connection.write_frame(&response).await.unwrap(); } } }
Теперь запустите сервер:
$ cargo run
и в отдельном окне терминала снова запустите пример клиента hello-redis:
$ cargo run --example hello-redis
Теперь вывод клиента будет:
got value from the server; result=Some(b"world")
Теперь мы можем получать и устанавливать значения, но есть проблема: значения не являются общими между соединениями. Если другой сокет подключится и попытается GET ключ hello, он ничего не найдет.
Полный код можно найти здесь.
В следующем разделе мы реализуем сохранение данных для всех сокетов.
Shared state
До сих пор у нас работал ключ-значение сервер. Однако есть серьезный недостаток: состояние не является общим между соединениями. Мы исправим это в этой статье.
Стратегии
Есть несколько различных способов разделения состояния в Tokio.
- Защита общего состояния с помощью Mutex.
- Создание задачи для управления состоянием и использование передачи сообщений для работы с ним.
Обычно вы хотите использовать первый подход для простых данных и второй подход для вещей, которые требуют асинхронной работы, таких как примитивы I/O. В этой главе общее состояние - это HashMap, а операции - insert и get. Ни одна из этих операций не является асинхронной, поэтому мы будем использовать Mutex.
Второй подход рассматривается в следующей главе.
Добавление зависимости bytes
Вместо использования Vec<u8>, крейт Mini-Redis использует Bytes из крейта bytes. Цель Bytes - предоставить надежную структуру байтового массива для сетевого программирования. Самая большая особенность, которую он добавляет по сравнению с Vec<u8>, - это поверхностное клонирование. Другими словами, вызов clone() на экземпляре Bytes не копирует базовые данные. Вместо этого экземпляр Bytes является ссылочно-считаемым handle к некоторым базовым данным. Тип Bytes примерно похож на Arc<Vec<u8>>, но с некоторыми дополнительными возможностями.
Чтобы добавить зависимость bytes, добавьте следующее в ваш Cargo.toml в раздел [dependencies]:
bytes = "1"
Инициализация HashMap
HashMap будет общим для многих задач и потенциально многих потоков. Для поддержки этого он обернут в Arc<Mutex<_>>.
Сначала для удобства добавьте следующий псевдоним типа после операторов use.
#![allow(unused)] fn main() { use bytes::Bytes; use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Db = Arc<Mutex<HashMap<String, Bytes>>>; }
Затем обновите функцию main для инициализации HashMap и передачи handle Arc в функцию process. Использование Arc позволяет на HashMap ссылаться конкурентно из многих задач, потенциально выполняющихся на многих потоках. В Tokio термин handle используется для ссылки на значение, которое предоставляет доступ к некоторому общему состоянию.
use tokio::net::TcpListener; use std::collections::HashMap; use std::sync::{Arc, Mutex}; fn dox() { #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); println!("Listening"); let db = Arc::new(Mutex::new(HashMap::new())); loop { let (socket, _) = listener.accept().await.unwrap(); // Клонируем handle к хэш-карте. let db = db.clone(); println!("Accepted"); tokio::spawn(async move { process(socket, db).await; }); } } } type Db = Arc<Mutex<HashMap<(), ()>>>; async fn process(_: tokio::net::TcpStream, _: Db) {}
О использовании std::sync::Mutex и tokio::sync::Mutex
Обратите внимание, что std::sync::Mutex и не tokio::sync::Mutex используется для защиты HashMap. Распространенная ошибка - безусловно использовать tokio::sync::Mutex в асинхронном коде. Асинхронный мьютекс - это мьютекс, который блокируется при вызовах .await.
Синхронный мьютекс будет блокировать текущий поток при ожидании получения блокировки. Это, в свою очередь, заблокирует обработку других задач. Однако переключение на tokio::sync::Mutex обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс внутри.
Как правило, использование синхронного мьютекса в асинхронном коде допустимо, пока конкуренция остается низкой и блокировка не удерживается при вызовах .await.
Обновление process()
Функция process больше не инициализирует HashMap. Вместо этого она принимает общий handle к HashMap в качестве аргумента. Также необходимо заблокировать HashMap перед использованием. Помните, что тип значения для HashMap теперь Bytes (который мы можем дешево клонировать), поэтому это также нужно изменить.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Db = Arc<Mutex<HashMap<String, bytes::Bytes>>>; async fn process(socket: TcpStream, db: Db) { use mini_redis::Command::{self, Get, Set}; // Connection, предоставляемый `mini-redis`, обрабатывает разбор фреймов из сокета let mut connection = Connection::new(socket); while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { let mut db = db.lock().unwrap(); db.insert(cmd.key().to_string(), cmd.value().clone()); Frame::Simple("OK".to_string()) } Get(cmd) => { let db = db.lock().unwrap(); if let Some(value) = db.get(cmd.key()) { Frame::Bulk(value.clone()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // Записываем ответ клиенту connection.write_frame(&response).await.unwrap(); } } }
Удержание MutexGuard через .await
Вы можете написать код, который выглядит так:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; do_something_async().await; } // lock выходит из области видимости здесь async fn do_something_async() {} }
Когда вы попытаетесь создать что-то, что вызывает эту функцию, вы столкнетесь со следующей ошибкой:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
Это происходит потому, что тип std::sync::MutexGuard не является Send. Это означает, что вы не можете отправить блокировку мьютекса в другой поток, и ошибка возникает потому, что среда выполнения Tokio может перемещать задачу между потоками при каждом .await. Чтобы избежать этого, вы должны переструктурировать ваш код так, чтобы деструктор блокировки мьютекса выполнялся до .await.
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; // Это работает! async fn increment_and_do_stuff(mutex: &Mutex<i32>) { { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; } // lock выходит из области видимости здесь do_something_async().await; } async fn do_something_async() {} }
Обратите внимание, что это не работает:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; // Это тоже не работает. async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; drop(lock); do_something_async().await; } async fn do_something_async() {} }
Это потому, что компилятор в настоящее время вычисляет, является ли future Send, на основе только информации об области видимости. Надеюсь, компилятор будет обновлен для поддержки явного удаления в будущем, но сейчас вы должны явно использовать область видимости.
Обратите внимание, что ошибка, обсуждаемая здесь, также обсуждается в разделе Send bound из главы о создании задач.
Вы не должны пытаться обойти эту проблему, создавая задачу способом, который не требует, чтобы она была Send, потому что если Tokio приостановит вашу задачу на .await, пока задача удерживает блокировку, другая задача может быть запланирована на выполнение в том же потоке, и эта другая задача также может попытаться заблокировать этот мьютекс, что приведет к взаимной блокировке, поскольку задача, ожидающая блокировки мьютекса, предотвратит освобождение мьютекса задачей, удерживающей мьютекс.
Имейте в виду, что некоторые крейты мьютексов реализуют Send для своих MutexGuards. В этом случае нет ошибки компилятора, даже если вы удерживаете MutexGuard через .await. Код компилируется, но он взаимно блокируется!
Мы обсудим некоторые подходы к избежанию этих проблем ниже:
Переструктурируйте ваш код, чтобы не удерживать блокировку через .await
Самый безопасный способ обработки мьютекса - это обернуть его в структуру и блокировать мьютекс только внутри не-асинхронных методов этой структуры.
#![allow(unused)] fn main() { use std::sync::Mutex; struct CanIncrement { mutex: Mutex<i32>, } impl CanIncrement { // Эта функция не помечена как async. fn increment(&self) { let mut lock = self.mutex.lock().unwrap(); *lock += 1; } } async fn increment_and_do_stuff(can_incr: &CanIncrement) { can_incr.increment(); do_something_async().await; } async fn do_something_async() {} }
Этот шаблон гарантирует, что вы не столкнетесь с ошибкой Send, потому что защита мьютекса не появляется нигде в асинхронной функции. Это также защищает вас от взаимных блокировок при использовании крейтов, чей MutexGuard реализует Send.
Вы можете найти более подробный пример в этом посте блога.
Создайте задачу для управления состоянием и используйте передачу сообщений для работы с ним
Это второй подход, упомянутый в начале этой главы, и он часто используется, когда общий ресурс является ресурсом I/O. Смотрите следующую главу для более подробной информации.
Используйте асинхронный мьютекс Tokio
Тип tokio::sync::Mutex, предоставляемый Tokio, также может быть использован. Основная особенность мьютекса Tokio заключается в том, что его можно удерживать через .await без каких-либо проблем. Тем не менее, асинхронный мьютекс дороже, чем обычный мьютекс, и обычно лучше использовать один из двух других подходов.
#![allow(unused)] fn main() { use tokio::sync::Mutex; //注意! Это использует мьютекс Tokio // Это компилируется! // (но переструктурирование кода было бы лучше в этом случае) async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock = mutex.lock().await; *lock += 1; do_something_async().await; } // lock выходит из области видимости здесь async fn do_something_async() {} }
Задачи, потоки и конкуренция
Использование блокирующего мьютекса для защиты коротких критических секций является приемлемой стратегией, когда конкуренция минимальна. Когда блокировка оспаривается, поток, выполняющий задачу, должен блокироваться и ждать мьютекс. Это не только заблокирует текущую задачу, но также заблокирует все другие задачи, запланированные на текущем потоке.
По умолчанию среда выполнения Tokio использует многопоточный планировщик. Задачи планируются на любом количестве потоков, управляемых средой выполнения. Если большое количество задач запланировано на выполнение и все они требуют доступа к мьютексу, то будет конкуренция. С другой стороны, если используется вариант среды выполнения current_thread, то мьютекс никогда не будет оспариваться.
информация Вариант среды выполнения
current_thread- это легковесная однопоточная среда выполнения. Это хороший выбор, когда создается только несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении синхронного API-моста поверх асинхронной клиентской библиотеки.
Если конкуренция на синхронном мьютексе становится проблемой, лучшим исправлением редко является переключение на мьютекс Tokio. Вместо этого варианты, которые следует рассмотреть, это:
- Позволить выделенной задаче управлять состоянием и использовать передачу сообщений.
- Шардировать мьютекс.
- Переструктурировать код, чтобы избежать мьютекса.
Шардирование мьютекса
В нашем случае, поскольку каждый ключ независим, шардирование мьютекса будет работать хорошо. Для этого вместо одного экземпляра Mutex<HashMap<_, _>> мы введем N различных экземпляров.
#![allow(unused)] fn main() { use std::collections::HashMap; use std::sync::{Arc, Mutex}; type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>; fn new_sharded_db(num_shards: usize) -> ShardedDb { let mut db = Vec::with_capacity(num_shards); for _ in 0..num_shards { db.push(Mutex::new(HashMap::new())); } Arc::new(db) } }
Затем поиск ячейки для любого заданного ключа становится двухэтапным процессом. Сначала ключ используется для идентификации, к какому шарду он принадлежит. Затем ключ ищется в HashMap.
#![allow(unused)] fn main() { let shard = db[hash(key) % db.len()].lock().unwrap(); shard.insert(key, value); }
Простая реализация, описанная выше, требует использования фиксированного числа шардов, и количество шардов не может быть изменено после создания шардированной карты.
Крейт dashmap предоставляет реализацию более сложной шардированной хэш-карты. Вы также можете посмотреть на такие реализации конкурентных хэш-таблиц, как leapfrog и flurry, последняя является портом структуры данных ConcurrentHashMap Java.
Прежде чем вы начнете использовать любой из этих крейтов, убедитесь, что вы структурировали свой код так, чтобы вы не могли удерживать MutexGuard через .await. Если вы этого не сделаете, у вас либо будут ошибки компилятора (в случае не-Send защит), либо ваш код взаимно заблокируется (в случае Send защит). Смотрите полный пример и больше контекста в этом посте блога.
Каналы
Теперь, когда мы немного узнали о параллелизме с Tokio, давайте применим это на стороне клиента. Поместим код сервера, который мы написали ранее, в отдельный бинарный файл:
$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs
и создадим новый бинарный файл, который будет содержать код клиента:
$ touch src/bin/client.rs
В этом файле вы будете писать код этой страницы. Когда вы захотите запустить его, вам нужно будет сначала запустить сервер в отдельном окне терминала:
$ cargo run --bin server
А затем клиент отдельно:
$ cargo run --bin client
Что ж, давайте программировать!
Допустим, мы хотим выполнить две параллельные команды Redis. Мы можем создать по одной задаче для каждой команды. Тогда две команды будут выполняться параллельно.
Сначала мы можем попробовать что-то вроде:
use mini_redis::client; #[tokio::main] async fn main() { // Устанавливаем соединение с сервером let mut client = client::connect("127.0.0.1:6379").await.unwrap(); // Создаем две задачи: одна получает ключ, другая устанавливает ключ let t1 = tokio::spawn(async { let res = client.get("foo").await; }); let t2 = tokio::spawn(async { client.set("foo", "bar".into()).await; }); t1.await.unwrap(); t2.await.unwrap(); }
Это не компилируется, потому что обе задачи должны как-то обращаться к client. Поскольку Client не реализует Copy, код не скомпилируется без некоторого кода для облегчения этого совместного использования. Кроме того, Client::set принимает &mut self, что означает, что требуется эксклюзивный доступ для его вызова. Мы могли бы открыть соединение для каждой задачи, но это не идеально. Мы не можем использовать std::sync::Mutex, так как .await нужно вызывать с удержанной блокировкой. Мы могли бы использовать tokio::sync::Mutex, но это позволило бы выполнять только один запрос одновременно. Если клиент реализует конвейеризацию (кратко: отправка многих команд без ожидания ответа на каждую предыдущую команду), асинхронный мьютекс приводит к неполному использованию соединения.
Передача сообщений
Ответ заключается в использовании передачи сообщений. Шаблон включает создание выделенной задачи для управления ресурсом client. Любая задача, которая хочет выполнить запрос, отправляет сообщение задаче client. Задача client выполняет запрос от имени отправителя, и ответ отправляется обратно отправителю.
Используя эту стратегию, устанавливается единственное соединение. Задача, управляющая client, может получить эксклюзивный доступ для вызова get и set. Кроме того, канал работает как буфер. Операции могут быть отправлены задаче client, пока она занята. Как только задача client становится доступной для обработки новых запросов, она извлекает следующий запрос из канала. Это может привести к лучшей пропускной способности и быть расширено для поддержки пула соединений.
Примитивы каналов Tokio
Tokio предоставляет несколько каналов, каждый из которых служит своей цели.
- mpsc: многопоточный отправитель, однопоточный получатель. Можно отправить много значений.
- oneshot: однопоточный отправитель, однопоточный получатель. Можно отправить одно значение.
- broadcast: многопоточный отправитель, многопоточный получатель. Можно отправить много значений. Каждый получатель видит каждое значение.
- watch: многопоточный отправитель, многопоточный получатель. Можно отправить много значений, но история не сохраняется. Получатели видят только последнее значение.
Если вам нужен многопоточный отправитель/многопоточный получатель канала, где только один получатель видит каждое сообщение, вы можете использовать крейт async-channel. Также существуют каналы для использования вне асинхронного Rust, такие как std::sync::mpsc и crossbeam::channel. Эти каналы ожидают сообщения, блокируя поток, что не допускается в асинхронном коде.
В этом разделе мы будем использовать mpsc и oneshot. Другие типы каналов передачи сообщений исследуются в последующих разделах. Полный код из этого раздела находится здесь.
Определение типа сообщения
В большинстве случаев, при использовании передачи сообщений, задача, получающая сообщения, отвечает более чем на одну команду. В нашем случае задача будет отвечать на команды GET и SET. Чтобы смоделировать это, мы сначала определим перечисление Command и включим вариант для каждого типа команды.
#![allow(unused)] fn main() { use bytes::Bytes; #[derive(Debug)] enum Command { Get { key: String, }, Set { key: String, val: Bytes, } } }
Создание канала
В функции main создается канал mpsc.
use tokio::sync::mpsc; #[tokio::main] async fn main() { // Создаем новый канал с емкостью не более 32. let (tx, mut rx) = mpsc::channel(32); tx.send(()).await.unwrap(); // ... Дальнейший код здесь }
Канал mpsc используется для отправки команд задаче, управляющей соединением redis. Возможность многопоточности позволяет отправлять сообщения из многих задач. Создание канала возвращает два значения: отправитель и получатель. Два handle используются отдельно. Они могут быть перемещены в разные задачи.
Канал создается с емкостью 32. Если сообщения отправляются быстрее, чем принимаются, канал будет их хранить. После того как в канале сохранено 32 сообщения, вызов send(...).await перейдет в режим ожидания, пока сообщение не будет удалено получателем.
Отправка из нескольких задач осуществляется путем клонирования Sender. Например:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); let tx2 = tx.clone(); tokio::spawn(async move { tx.send("sending from first handle").await.unwrap(); }); tokio::spawn(async move { tx2.send("sending from second handle").await.unwrap(); }); while let Some(message) = rx.recv().await { println!("GOT = {}", message); } }
Оба сообщения отправляются в единственный handle Receiver. Невозможно клонировать получателя канала mpsc.
Когда каждый Sender вышел из области видимости или иным образом был удален, больше невозможно отправлять сообщения в канал. В этот момент вызов recv на Receiver вернет None, что означает, что все отправители исчезли и канал закрыт.
В нашем случае задачи, управляющей соединением Redis, она знает, что может закрыть соединение Redis, когда канал закрыт, так как соединение больше не будет использоваться.
Создание задачи-менеджера
Далее создадим задачу, которая обрабатывает сообщения из канала. Сначала устанавливается клиентское соединение с Redis. Затем полученные команды выполняются через соединение Redis.
#![allow(unused)] fn main() { use mini_redis::client; enum Command { Get { key: String }, Set { key: String, val: bytes::Bytes } } async fn dox() { let (_, mut rx) = tokio::sync::mpsc::channel(10); // Ключевое слово `move` используется для **перемещения** владения `rx` в задачу. let manager = tokio::spawn(async move { // Устанавливаем соединение с сервером let mut client = client::connect("127.0.0.1:6379").await.unwrap(); // Начинаем получать сообщения while let Some(cmd) = rx.recv().await { use Command::*; match cmd { Get { key } => { client.get(&key).await; } Set { key, val } => { client.set(&key, val).await; } } } }); } }
Теперь обновим две задачи для отправки команд через канал вместо их непосредственного выполнения на соединении Redis.
#![allow(unused)] fn main() { #[derive(Debug)] enum Command { Get { key: String }, Set { key: String, val: bytes::Bytes } } async fn dox() { let (mut tx, _) = tokio::sync::mpsc::channel(10); // Handle `Sender` перемещаются в задачи. Поскольку есть две задачи, нам нужен второй `Sender`. let tx2 = tx.clone(); // Создаем две задачи: одна получает ключ, другая устанавливает ключ let t1 = tokio::spawn(async move { let cmd = Command::Get { key: "foo".to_string(), }; tx.send(cmd).await.unwrap(); }); let t2 = tokio::spawn(async move { let cmd = Command::Set { key: "foo".to_string(), val: "bar".into(), }; tx2.send(cmd).await.unwrap(); }); } }
Внизу функции main мы используем .await для join handles, чтобы гарантировать полное завершение команд перед выходом из процесса.
#![allow(unused)] fn main() { type Jh = tokio::task::JoinHandle<()>; async fn dox(t1: Jh, t2: Jh, manager: Jh) { t1.await.unwrap(); t2.await.unwrap(); manager.await.unwrap(); } }
Получение ответов
Последний шаг — получение ответа от задачи-менеджера. Команде GET нужно получить значение, а команде SET нужно знать, успешно ли завершилась операция.
Для передачи ответа используется канал oneshot. Канал oneshot — это канал с одним отправителем и одним получателем, оптимизированный для отправки одного значения. В нашем случае единственное значение — это ответ.
Аналогично mpsc, oneshot::channel() возвращает handle отправителя и получателя.
#![allow(unused)] fn main() { use tokio::sync::oneshot; async fn dox() { let (tx, rx) = oneshot::channel(); tx.send(()).unwrap(); } }
В отличие от mpsc, емкость не указывается, так как она всегда равна единице. Кроме того, ни один из handle не может быть клонирован.
Для получения ответов от задачи-менеджера, перед отправкой команды создается канал oneshot. Половина Sender канала включается в команду для задачи-менеджера. Половина получения используется для получения ответа.
Сначала обновим Command, чтобы включить Sender. Для удобства используется псевдоним типа для ссылки на Sender.
#![allow(unused)] fn main() { use tokio::sync::oneshot; use bytes::Bytes; /// Несколько различных команд мультиплексируются через один канал. #[derive(Debug)] enum Command { Get { key: String, resp: Responder<Option<Bytes>>, }, Set { key: String, val: Bytes, resp: Responder<()>, }, } /// Предоставляется запрашивающей стороной и используется задачей-менеджером для отправки ответа на команду обратно запрашивающей стороне. type Responder<T> = oneshot::Sender<mini_redis::Result<T>>; }
Теперь обновим задачи, выполняющие команды, чтобы включить oneshot::Sender.
#![allow(unused)] fn main() { use tokio::sync::{oneshot, mpsc}; use bytes::Bytes; #[derive(Debug)] enum Command { Get { key: String, resp: Responder<Option<bytes::Bytes>> }, Set { key: String, val: Bytes, resp: Responder<()> }, } type Responder<T> = oneshot::Sender<mini_redis::Result<T>>; fn dox() { let (mut tx, mut rx) = mpsc::channel(10); let mut tx2 = tx.clone(); let t1 = tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::Get { key: "foo".to_string(), resp: resp_tx, }; // Отправляем запрос GET tx.send(cmd).await.unwrap(); // Ожидаем ответ let res = resp_rx.await; println!("GOT = {:?}", res); }); let t2 = tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::Set { key: "foo".to_string(), val: "bar".into(), resp: resp_tx, }; // Отправляем запрос SET tx2.send(cmd).await.unwrap(); // Ожидаем ответ let res = resp_rx.await; println!("GOT = {:?}", res); }); } }
Наконец, обновим задачу-менеджер для отправки ответа через канал oneshot.
#![allow(unused)] fn main() { use tokio::sync::{oneshot, mpsc}; use bytes::Bytes; #[derive(Debug)] enum Command { Get { key: String, resp: Responder<Option<bytes::Bytes>> }, Set { key: String, val: Bytes, resp: Responder<()> }, } type Responder<T> = oneshot::Sender<mini_redis::Result<T>>; async fn dox(mut client: mini_redis::client::Client) { let (_, mut rx) = mpsc::channel::<Command>(10); while let Some(cmd) = rx.recv().await { match cmd { Command::Get { key, resp } => { let res = client.get(&key).await; // Игнорируем ошибки let _ = resp.send(res); } Command::Set { key, val, resp } => { let res = client.set(&key, val).await; // Игнорируем ошибки let _ = resp.send(res); } } } } }
Вызов send на oneshot::Sender завершается немедленно и не требует .await. Это связано с тем, что send на канале oneshot всегда завершается неудачей или успехом немедленно без какого-либо ожидания.
Отправка значения на канал oneshot возвращает Err, когда половина получателя была удалена. Это указывает, что получатель больше не заинтересован в ответе. В нашем сценарии отмена интереса получателем является допустимым событием. Ошибка Err, возвращаемая resp.send(...), не требует обработки.
Вы можете найти полный код здесь.
Обратное давление и ограниченные каналы
Всякий раз, когда вводится параллелизм или очереди, важно обеспечить, чтобы очереди были ограничены и система могла корректно обрабатывать нагрузку. Неограниченные очереди в конечном итоге заполнят всю доступную память и вызовут сбой системы непредсказуемым образом.
Tokio заботится о том, чтобы избежать неявного создания очередей. Большая часть этого заключается в том, что асинхронные операции ленивы. Рассмотрим следующее:
fn async_op() {} fn dox() { loop { async_op(); } } fn main() {}
Если асинхронная операция выполняется активно, цикл будет повторно ставить в очередь новую async_op для выполнения без обеспечения завершения предыдущей операции. Это приводит к неявному неограниченному созданию очередей. Системы на основе обратных вызовов и системы на основе активных future особенно подвержены этому.
Однако с Tokio и асинхронным Rust приведенный фрагмент не приведет к выполнению async_op вообще. Это потому, что .await никогда не вызывается. Если фрагмент обновлен для использования .await, то цикл ждет завершения операции перед началом нового цикла.
async fn async_op() {} async fn dox() { loop { // Не повторится, пока `async_op` не завершится async_op().await; } } fn main() {}
Параллелизм и очереди должны вводиться явно. Способы сделать это включают:
tokio::spawnselect!join!mpsc::channel
При этом позаботьтесь о том, чтобы общий объем параллелизма был ограничен. Например, при написании цикла принятия TCP убедитесь, что общее количество открытых сокетов ограничено. При использовании mpsc::channel выберите управляемую емкость канала. Конкретные значения границ будут зависеть от приложения.
Забота и выбор хороших границ — большая часть написания надежных приложений Tokio.
I/O
Ввод-вывод в Tokio работает во многом так же, как и в std, но асинхронно. Существует трейт для чтения (AsyncRead) и трейт для записи (AsyncWrite). Конкретные типы реализуют эти трейты соответствующим образом (TcpStream, File, Stdout). AsyncRead и AsyncWrite также реализованы для ряда структур данных, таких как Vec<u8> и &[u8]. Это позволяет использовать байтовые массивы там, где ожидается читатель или писатель.
На этой странице будут рассмотрены базовое чтение и запись ввода-вывода с Tokio и пройдены несколько примеров. На следующей странице мы перейдем к более продвинутому примеру ввода-вывода.
AsyncRead и AsyncWrite
Эти два трейта предоставляют возможности для асинхронного чтения из байтовых потоков и записи в них. Методы этих трейтов обычно не вызываются напрямую, аналогично тому, как вы не вызываете вручную метод poll из трейта Future. Вместо этого вы будете использовать их через вспомогательные методы, предоставляемые AsyncReadExt и AsyncWriteExt.
Давайте кратко рассмотрим несколько из этих методов. Все эти функции являются async и должны использоваться с .await.
async fn read()
AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращая количество прочитанных байтов.
Примечание: когда read() возвращает Ok(0), это означает, что поток закрыт. Любые последующие вызовы read() будут немедленно завершаться с Ok(0). Для экземпляров TcpStream это означает, что половина сокета для чтения закрыта.
use tokio::fs::File; use tokio::io::{self, AsyncReadExt}; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = [0; 10]; // читаем до 10 байт let n = f.read(&mut buffer[..]).await?; println!("The bytes: {:?}", &buffer[..n]); Ok(()) } }
async fn read_to_end()
AsyncReadExt::read_to_end читает все байты из потока до EOF.
use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = Vec::new(); // читаем весь файл f.read_to_end(&mut buffer).await?; Ok(()) } }
async fn write()
AsyncWriteExt::write записывает буфер в писатель, возвращая количество записанных байтов.
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; // Записывает некоторый префикс байтовой строки, но не обязательно всю. let n = file.write(b"some bytes").await?; println!("Wrote the first {} bytes of 'some bytes'.", n); Ok(()) } }
async fn write_all()
AsyncWriteExt::write_all записывает весь буфер в писатель.
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; file.write_all(b"some bytes").await?; Ok(()) } }
Оба трейта включают множество других полезных методов. См. документацию API для полного списка.
Вспомогательные функции
Кроме того, как и в std, модуль tokio::io содержит множество полезных утилитных функций, а также API для работы со стандартным вводом, стандартным выводом и стандартной ошибкой. Например, tokio::io::copy асинхронно копирует все содержимое читателя в писатель.
use tokio::fs::File; use tokio::io; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut reader: &[u8] = b"hello"; let mut file = File::create("foo.txt").await?; io::copy(&mut reader, &mut file).await?; Ok(()) } }
Обратите внимание, что здесь используется тот факт, что байтовые массивы также реализуют AsyncRead.
Эхо-сервер
Давайте попрактикуемся в асинхронном вводе-выводе. Мы напишем эхо-сервер.
Эхо-сервер привязывает TcpListener и принимает входящие соединения в цикле. Для каждого входящего соединения данные читаются из сокета и немедленно записываются обратно в сокет. Клиент отправляет данные на сервер и получает точно такие же данные обратно.
Мы реализуем эхо-сервер дважды, используя немного разные стратегии.
Использование io::copy()
Для начала мы реализуем логику эха с помощью утилиты io::copy.
Вы можете написать этот код в новом бинарном файле:
$ touch src/bin/echo-server-copy.rs
Который вы можете запустить (или просто проверить компиляцию) с помощью:
$ cargo run --bin echo-server-copy
Вы сможете испытать сервер, используя стандартное командное средство, такое как telnet, или написав простой клиент, подобный тому, который находится в документации для tokio::net::TcpStream.
Это TCP-сервер, и ему нужен цикл принятия. Для каждого принятого сокета создается новая задача.
use tokio::io; use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { // Копируем данные здесь }); } } }
Как было показано ранее, эта утилитная функция принимает читателя и писателя и копирует данные из одного в другой. Однако у нас есть только один TcpStream. Этот единственный значение реализует и AsyncRead, и AsyncWrite. Поскольку io::copy требует &mut как для читателя, так и для писателя, сокет не может быть использован для обоих аргументов.
#![allow(unused)] fn main() { // Это не компилируется io::copy(&mut socket, &mut socket).await }
Разделение читателя + писателя
Чтобы обойти эту проблему, мы должны разделить сокет на handle читателя и handle писателя. Лучший способ разделить комбинацию читатель/писатель зависит от конкретного типа.
Любой тип читатель + писатель может быть разделен с помощью утилиты io::split. Эта функция принимает одно значение и возвращает отдельные handle читателя и писателя. Эти два handle могут использоваться независимо, в том числе из отдельных задач.
Например, эхо-клиент может обрабатывать конкурентные чтение и запись следующим образом:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let socket = TcpStream::connect("127.0.0.1:6142").await?; let (mut rd, mut wr) = io::split(socket); // Записываем данные в фоне tokio::spawn(async move { wr.write_all(b"hello\r\n").await?; wr.write_all(b"world\r\n").await?; // Иногда rust выводу типов нужна небольшая помощь Ok::<_, io::Error>(()) }); let mut buf = vec![0; 128]; loop { let n = rd.read(&mut buf).await?; if n == 0 { break; } println!("GOT {:?}", &buf[..n]); } Ok(()) } }
Поскольку io::split поддерживает любое значение, которое реализует AsyncRead + AsyncWrite, и возвращает независимые handle, внутри io::split используется Arc и Mutex. Этих накладных расходов можно избежать с TcpStream. TcpStream предлагает две специализированные функции разделения.
TcpStream::split принимает ссылку на поток и возвращает handle читателя и писателя. Поскольку используется ссылка, оба handle должны оставаться в той же задаче, из которой был вызван split(). Это специализированное split является бесплатным. Не нужны Arc или Mutex. TcpStream также предоставляет into_split, который поддерживает handle, которые могут перемещаться между задачами, ценой только Arc.
Поскольку io::copy() вызывается в той же задаче, которой принадлежит TcpStream, мы можем использовать TcpStream::split. Задача, которая обрабатывает логику эха в сервере, становится:
#![allow(unused)] fn main() { use tokio::io; use tokio::net::TcpStream; fn dox(mut socket: TcpStream) { tokio::spawn(async move { let (mut rd, mut wr) = socket.split(); if io::copy(&mut rd, &mut wr).await.is_err() { eprintln!("failed to copy"); } }); } }
Полный код можно найти здесь.
Ручное копирование
Теперь давайте посмотрим, как мы бы написали эхо-сервер, копируя данные вручную. Для этого мы используем AsyncReadExt::read и AsyncWriteExt::write_all.
Полный эхо-сервер выглядит следующим образом:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = vec![0; 1024]; loop { match socket.read(&mut buf).await { // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта Ok(0) => return, Ok(n) => { // Копируем данные обратно в сокет if socket.write_all(&buf[..n]).await.is_err() { // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку. return; } } Err(_) => { // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку. return; } } } }); } } }
(Вы можете поместить этот код в src/bin/echo-server.rs и запустить его с помощью cargo run --bin echo-server).
Давайте разберем его. Во-первых, поскольку используются утилиты AsyncRead и AsyncWrite, трейты расширения должны быть внесены в область видимости.
#![allow(unused)] fn main() { use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; }
Выделение буфера
Стратегия заключается в том, чтобы прочитать некоторые данные из сокета в буфер, а затем записать содержимое буфера обратно в сокет.
#![allow(unused)] fn main() { let mut buf = vec![0; 1024]; }
Явно избегается стековый буфер. Вспомним из ранее, мы отметили, что все данные задачи, которые живут между вызовами .await, должны храниться задачей. В этом случае buf используется между вызовами .await. Все данные задачи хранятся в одном выделении памяти. Вы можете думать об этом как о enum, где каждый вариант - это данные, которые нужно хранить для конкретного вызова .await.
Если буфер представлен стековым массивом, внутренняя структура для задач, созданных для каждого принятого сокета, может выглядеть примерно так:
#![allow(unused)] fn main() { struct Task { // внутренние поля задачи здесь task: enum { AwaitingRead { socket: TcpStream, buf: [BufferType], }, AwaitingWriteAll { socket: TcpStream, buf: [BufferType], } } } }
Если стековый массив используется как тип буфера, он будет храниться встроенным в структуру задачи. Это сделает структуру задачи очень большой. Кроме того, размеры буферов часто равны размеру страницы. Это, в свою очередь, сделает Task неудобного размера: $page-size + несколько-байт.
Компилятор оптимизирует расположение async блоков дальше, чем базовый enum. На практике переменные не перемещаются между вариантами, как это было бы необходимо с enum. Однако размер структуры задачи по крайней мере такой же большой, как самая большая переменная.
Из-за этого обычно более эффективно использовать выделенное выделение памяти для буфера.
Обработка EOF
Когда половина TCP-потока для чтения закрыта, вызов read() возвращает Ok(0). Важно выйти из цикла чтения в этот момент. Забывание прервать цикл чтения при EOF является распространенным источником ошибок.
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use tokio::net::TcpStream; async fn dox(mut socket: TcpStream) { let mut buf = vec![0_u8; 1024]; loop { match socket.read(&mut buf).await { // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта Ok(0) => return, // ... другие случаи обрабатываются здесь _ => unreachable!(), } } } }
Забывание прервать цикл чтения обычно приводит к ситуации бесконечного цикла с 100% загрузкой CPU. Поскольку сокет закрыт, socket.read() возвращается немедленно. Цикл затем повторяется вечно.
Полный код можно найти здесь.
Framing
Теперь мы применим то, что только что узнали о вводе-выводе, и реализуем слой фрейминга для Mini-Redis. Фрейминг - это процесс преобразования байтового потока в поток фреймов. Фрейм - это единица данных, передаваемая между двумя узлами. Протокольный фрейм Redis определяется следующим образом:
#![allow(unused)] fn main() { use bytes::Bytes; enum Frame { Simple(String), Error(String), Integer(u64), Bulk(Bytes), Null, Array(Vec<Frame>), } }
Обратите внимание, что фрейм состоит только из данных без какой-либо семантики. Разбор команд и их реализация происходят на более высоком уровне.
Для HTTP фрейм может выглядеть так:
#![allow(unused)] fn main() { use bytes::Bytes; type Method = (); type Uri = (); type Version = (); type HeaderMap = (); type StatusCode = (); enum HttpFrame { RequestHead { method: Method, uri: Uri, version: Version, headers: HeaderMap, }, ResponseHead { status: StatusCode, version: Version, headers: HeaderMap, }, BodyChunk { chunk: Bytes, }, } }
Чтобы реализовать фрейминг для Mini-Redis, мы реализуем структуру Connection, которая оборачивает TcpStream и читает/пишет значения mini_redis::Frame.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Frame, Result}; struct Connection { stream: TcpStream, // ... другие поля здесь } impl Connection { /// Читает фрейм из соединения. /// /// Возвращает `None`, если достигнут EOF pub async fn read_frame(&mut self) -> Result<Option<Frame>> { // реализация здесь unimplemented!(); } /// Записывает фрейм в соединение. pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> { // реализация здесь unimplemented!(); } } }
Подробности протокола Redis можно найти здесь. Полный код Connection находится здесь.
Буферизованное чтение
Метод read_frame ожидает получения всего фрейма перед возвратом. Один вызов TcpStream::read() может вернуть произвольное количество данных. Это может содержать целый фрейм, частичный фрейм или несколько фреймов. Если получен частичный фрейм, данные буферизуются и считываются дополнительные данные из сокета. Если получено несколько фреймов, возвращается первый фрейм, а остальные данные буферизуются до следующего вызова read_frame.
Если вы еще этого не сделали, создайте новый файл с именем connection.rs.
touch src/connection.rs
Для реализации этого Connection нуждается в поле буфера чтения. Данные считываются из сокета в буфер чтения. Когда фрейм разбирается, соответствующие данные удаляются из буфера.
Мы будем использовать BytesMut в качестве типа буфера. Это изменяемая версия Bytes.
#![allow(unused)] fn main() { use bytes::BytesMut; use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // Выделяем буфер вместимостью 4 КБ. buffer: BytesMut::with_capacity(4096), } } } }
Далее мы реализуем метод read_frame().
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use bytes::Buf; use mini_redis::Result; struct Connection { stream: tokio::net::TcpStream, buffer: bytes::BytesMut, } struct Frame {} impl Connection { pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { // Попытка разобрать фрейм из буферизованных данных. Если // было буферизовано достаточно данных, фрейм возвращается. if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // Недостаточно буферизованных данных для чтения фрейма. // Попытка прочитать больше данных из сокета. // // При успехе возвращается количество байтов. `0` // указывает на "конец потока". if 0 == self.stream.read_buf(&mut self.buffer).await? { // Удаленная сторона закрыла соединение. Для чистого // завершения в буфере чтения не должно быть данных. // Если они есть, это означает, что узел закрыл сокет // во время отправки фрейма. if self.buffer.is_empty() { return Ok(None); } else { return Err("connection reset by peer".into()); } } } } fn parse_frame(&self) -> Result<Option<Frame>> { unimplemented!() } } }
Давайте разберем это. Метод read_frame работает в цикле. Сначала вызывается self.parse_frame(). Это попытается разобрать фрейм redis из self.buffer. Если есть достаточно данных для разбора фрейма, фрейм возвращается вызывающей стороне read_frame(). В противном случае мы пытаемся прочитать больше данных из сокета в буфер. После чтения дополнительных данных parse_frame() вызывается снова. На этот раз, если было получено достаточно данных, разбор может быть успешным.
При чтении из потока возвращаемое значение 0 указывает, что больше данных от узла получено не будет. Если в буфере чтения все еще есть данные, это указывает на то, что был получен частичный фрейм и соединение завершается внезапно. Это ошибочная ситуация, и возвращается Err.
Трейт Buf
При чтении из потока вызывается read_buf. Эта версия функции чтения принимает значение, реализующее BufMut из крейта bytes.
Сначала рассмотрим, как мы бы реализовали тот же цикл чтения с помощью read(). Вместо BytesMut можно использовать Vec<u8>.
#![allow(unused)] fn main() { use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // Выделяем буфер вместимостью 4 КБ. buffer: vec![0; 4096], cursor: 0, } } } }
И функция read_frame() на Connection:
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use tokio::io::AsyncReadExt; pub struct Connection { stream: tokio::net::TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // Убеждаемся, что буфер имеет емкость if self.buffer.len() == self.cursor { // Увеличиваем буфер self.buffer.resize(self.cursor * 2, 0); } // Читаем в буфер, отслеживая количество // прочитанных байтов let n = self.stream.read( &mut self.buffer[self.cursor..]).await?; if 0 == n { if self.cursor == 0 { return Ok(None); } else { return Err("connection reset by peer".into()); } } else { // Обновляем наш курсор self.cursor += n; } } } fn parse_frame(&mut self) -> Result<Option<Frame>> { unimplemented!() } } }
При работе с байтовыми массивами и read мы также должны поддерживать курсор, отслеживающий, сколько данных было буферизовано. Мы должны убедиться, что передаем пустую часть буфера в read(). В противном случае мы перезапишем буферизованные данные. Если наш буфер заполняется, мы должны увеличить буфер, чтобы продолжить чтение. В parse_frame() (не включено) нам нужно будет разбирать данные, содержащиеся в self.buffer[..self.cursor].
Поскольку сочетание байтового массива с курсором очень распространено, крейт bytes предоставляет абстракцию, представляющую байтовый массив и курсор. Трейт Buf реализуется типами, из которых можно читать данные. Трейт BufMut реализуется типами, в которые можно записывать данные. При передаче T: BufMut в read_buf() внутренний курсор буфера автоматически обновляется read_buf. Благодаря этому в нашей версии read_frame нам не нужно управлять собственным курсором.
Кроме того, при использовании Vec<u8> буфер должен быть инициализирован. vec![0; 4096] выделяет массив из 4096 байтов и записывает ноль в каждую запись. При изменении размера буфера новая емкость также должна быть инициализирована нулями. Процесс инициализации не бесплатен. При работе с BytesMut и BufMut емкость не инициализирована. Абстракция BytesMut предотвращает чтение неинициализированной памяти. Это позволяет нам избежать шага инициализации.
Разбор
Теперь давайте посмотрим на функцию parse_frame(). Разбор выполняется в два этапа.
- Убедиться, что буферизован полный фрейм, и найти конечный индекс фрейма.
- Разобрать фрейм.
Крейт mini-redis предоставляет нам функцию для обоих этих этапов:
Мы также будем повторно использовать абстракцию Buf для помощи. Buf передается в Frame::check. Поскольку функция check перебирает переданный буфер, внутренний курсор будет продвигаться. Когда check возвращается, внутренний курсор буфера указывает на конец фрейма.
Для типа Buf мы будем использовать std::io::Cursor<&[u8]>.
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use mini_redis::frame::Error::Incomplete; use bytes::Buf; use std::io::Cursor; pub struct Connection { stream: tokio::net::TcpStream, buffer: bytes::BytesMut, } impl Connection { fn parse_frame(&mut self) -> Result<Option<Frame>> { // Создаем тип `T: Buf`. let mut buf = Cursor::new(&self.buffer[..]); // Проверяем, доступен ли полный фрейм match Frame::check(&mut buf) { Ok(_) => { // Получаем длину фрейма в байтах let len = buf.position() as usize; // Сбрасываем внутренний курсор для // вызова `parse`. buf.set_position(0); // Разбираем фрейм let frame = Frame::parse(&mut buf)?; // Удаляем фрейм из буфера self.buffer.advance(len); // Возвращаем фрейм вызывающей стороне. Ok(Some(frame)) } // Недостаточно данных было буферизовано Err(Incomplete) => Ok(None), // Произошла ошибка Err(e) => Err(e.into()), } } } }
Полную функцию Frame::check можно найти здесь. Мы не будем рассматривать ее полностью.
Важно отметить, что используются API Buf в стиле "байтового итератора". Они извлекают данные и продвигают внутренний курсор. Например, чтобы разобрать фрейм, проверяется первый байт, чтобы определить тип фрейма. Используемая функция - Buf::get_u8. Это извлекает байт в текущей позиции курсора и продвигает курсор на единицу.
Есть еще полезные методы в трейте Buf. Проверьте документацию API для более подробной информации.
Буферизованная запись
Другая половина API фрейминга - это функция write_frame(frame). Эта функция записывает целый фрейм в сокет. Чтобы минимизировать системные вызовы write, записи будут буферизоваться. Поддерживается буфер записи, и фреймы кодируются в этот буфер перед записью в сокет. Однако, в отличие от read_frame(), весь фрейм не всегда буферизуется в байтовый массив перед записью в сокет.
Рассмотрим фрейм массовой передачи. Записываемое значение - Frame::Bulk(Bytes). Проводной формат массового фрейма - это заголовок фрейма, состоящий из символа $, за которым следует длина данных в байтах. Большая часть фрейма - это содержимое значения Bytes. Если данные большие, копирование их в промежуточный буфер будет дорогостоящим.
Для реализации буферизованной записи мы будем использовать BufWriter struct. Эта структура инициализируется с T: AsyncWrite и сама реализует AsyncWrite. Когда write вызывается на BufWriter, запись идет не напрямую к внутреннему писателю, а в буфер. Когда буфер заполняется, содержимое сбрасывается во внутренний писатель, и внутренний буфер очищается. Также есть оптимизации, позволяющие обходить буфер в определенных случаях.
Мы не будем пытаться выполнить полную реализацию write_frame() в рамках руководства. См. полную реализацию здесь.
Сначала обновляется структура Connection:
#![allow(unused)] fn main() { use tokio::io::BufWriter; use tokio::net::TcpStream; use bytes::BytesMut; pub struct Connection { stream: BufWriter<TcpStream>, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream: BufWriter::new(stream), buffer: BytesMut::with_capacity(4096), } } } }
Далее реализуется write_frame().
#![allow(unused)] fn main() { use tokio::io::{self, AsyncWriteExt}; use mini_redis::Frame; struct Connection { stream: tokio::io::BufWriter<tokio::net::TcpStream>, buffer: bytes::BytesMut, } impl Connection { async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { match frame { Frame::Simple(val) => { self.stream.write_u8(b'+').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Error(val) => { self.stream.write_u8(b'-').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Integer(val) => { self.stream.write_u8(b':').await?; self.write_decimal(*val).await?; } Frame::Null => { self.stream.write_all(b"$-1\r\n").await?; } Frame::Bulk(val) => { let len = val.len(); self.stream.write_u8(b'$').await?; self.write_decimal(len as u64).await?; self.stream.write_all(val).await?; self.stream.write_all(b"\r\n").await?; } Frame::Array(_val) => unimplemented!(), } self.stream.flush().await; Ok(()) } async fn write_decimal(&mut self, val: u64) -> io::Result<()> { unimplemented!() } } }
Используемые здесь функции предоставляются AsyncWriteExt. Они доступны и на TcpStream, но не рекомендуется выполнять однобайтовые записи без промежуточного буфера.
write_u8записывает один байт в писатель.write_allзаписывает весь срез в писатель.write_decimalреализована mini-redis.
Функция завершается вызовом self.stream.flush().await. Поскольку BufWriter хранит записи в промежуточном буфере, вызовы write не гарантируют, что данные записаны в сокет. Перед возвратом мы хотим, чтобы фрейм был записан в сокет. Вызов flush() записывает любые данные, ожидающие в буфере, в сокет.
Другая альтернатива - не вызывать flush() в write_frame(). Вместо этого предоставить функцию flush() на Connection. Это позволило бы вызывающей стороне записать несколько маленьких фреймов в буфер записи, а затем записать их все в сокет одним системным вызовом write. Это усложняет API Connection. Простота - одна из целей Mini-Redis, поэтому мы решили включить вызов flush().await в fn write_frame().
Async в грубину
На этом этапе мы завершили довольно полный обзор асинхронного Rust и Tokio. Теперь мы углубимся в модель асинхронной выполнения Rust. В самом начале руководства мы намекали, что асинхронный Rust использует уникальный подход. Теперь мы объясним, что это означает.
Future (Футуры)
В качестве краткого повторения рассмотрим очень простую асинхронную функцию. Это ничего нового по сравнению с тем, что уже было рассмотрено в руководстве.
#![allow(unused)] fn main() { use tokio::net::TcpStream; async fn my_async_fn() { println!("hello from async"); let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap(); println!("async TCP operation complete"); } }
Мы вызываем функцию, и она возвращает некоторое значение. Мы вызываем .await для этого значения.
async fn my_async_fn() {} #[tokio::main] async fn main() { let what_is_this = my_async_fn(); // Пока ничего не напечатано. what_is_this.await; // Текст напечатан, сокет установлен и закрыт. }
Значение, возвращаемое my_async_fn(), является future. Future — это значение, которое реализует трейт std::future::Future, предоставляемый стандартной библиотекой. Это значения, которые содержат выполняющуюся асинхронную операцию.
Определение трейта std::future::Future:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } }
Связанный тип Output — это тип, который future производит после завершения. Тип Pin — это то, как Rust может поддерживать заимствования в async функциях. Подробнее см. в документации стандартной библиотеки.
В отличие от того, как future реализованы в других языках, future в Rust не представляет вычисление, происходящее в фоновом режиме, скорее future в Rust и есть само вычисление. Владелец future отвечает за продвижение вычисления путем опрашивания future. Это делается вызовом Future::poll.
Реализация Future
Давайте реализуем очень простой future. Этот future будет:
- Ждать до определенного момента времени.
- Выводить текст в STDOUT.
- Возвращать строку.
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Пока игнорируем эту строку. cx.waker().wake_by_ref(); Poll::Pending } } } #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }
Async fn как Future
В главной функции мы создаем future и вызываем для него .await. В асинхронных функциях мы можем вызывать .await для любого значения, реализующего Future. В свою очередь, вызов async функции возвращает анонимный тип, реализующий Future. В случае async fn main(), сгенерированный future примерно такой:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; enum MainFuture { // Инициализирован, никогда не опрашивался State0, // Ожидание `Delay`, т.е. строка `future.await`. State1(Delay), // Future завершен. Terminated, } struct Delay { when: Instant }; impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<&'static str> { unimplemented!(); } } impl Future for MainFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { use MainFuture::*; loop { match *self { State0 => { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; *self = State1(future); } State1(ref mut my_future) => { match Pin::new(my_future).poll(cx) { Poll::Ready(out) => { assert_eq!(out, "done"); *self = Terminated; return Poll::Ready(()); } Poll::Pending => { return Poll::Pending; } } } Terminated => { panic!("future polled after completion") } } } } } }
Future в Rust являются машинами состояний. Здесь MainFuture представлен как enum возможных состояний future. Future начинается в состоянии State0. Когда вызывается poll, future пытается продвинуть свое внутреннее состояние как можно дальше. Если future может завершиться, возвращается Poll::Ready, содержащий результат асинхронного вычисления.
Если future не может завершиться, обычно из-за того, что ресурсы, которых он ожидает, не готовы, то возвращается Poll::Pending. Получение Poll::Pending указывает вызывающей стороне, что future завершится позже и вызывающая сторона должна снова вызвать poll позже.
Мы также видим, что future состоят из других future. Вызов poll на внешнем future приводит к вызову poll внутреннего future.
Исполнители
Асинхронные функции Rust возвращают future. Для future должен вызываться poll, чтобы продвигать их состояние. Future состоят из других future. Итак, вопрос: что вызывает poll на самом внешнем future?
Вспомним, что для запуска асинхронных функций они должны быть либо переданы в tokio::spawn, либо быть главной функцией с аннотацией #[tokio::main]. Это приводит к отправке сгенерированного внешнего future в исполнитель Tokio. Исполнитель отвечает за вызов Future::poll на внешнем future, продвигая асинхронное вычисление к завершению.
Mini Tokio
Чтобы лучше понять, как все это сочетается, давайте реализуем нашу собственную минимальную версию Tokio! Полный код можно найти здесь.
use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use futures::task; fn main() { let mut mini_tokio = MiniTokio::new(); mini_tokio.spawn(async { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }); mini_tokio.run(); } struct Delay { when: Instant } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<&'static str> { Poll::Ready("done") } } struct MiniTokio { tasks: VecDeque<Task>, } type Task = Pin<Box<dyn Future<Output = ()> + Send>>; impl MiniTokio { fn new() -> MiniTokio { MiniTokio { tasks: VecDeque::new(), } } /// Создать задачу в экземпляре mini-tokio. fn spawn<F>(&mut self, future: F) where F: Future<Output = ()> + Send + 'static, { self.tasks.push_back(Box::pin(future)); } fn run(&mut self) { let waker = task::noop_waker(); let mut cx = Context::from_waker(&waker); while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { self.tasks.push_back(task); } } } }
Это запускает async-блок. Создается экземпляр Delay с запрошенной задержкой и ожидается. Однако наша реализация пока имеет серьезный недостаток. Наш исполнитель никогда не переходит в режим ожидания. Исполнитель непрерывно циклически опрашивает все созданные future. Большую часть времени future не будут готовы к выполнению работы и снова вернут Poll::Pending. Процесс будет потреблять циклы CPU и в целом будет не очень эффективным.
В идеале мы хотим, чтобы mini-tokio опрашивал future только тогда, когда future может продолжить работу. Это происходит, когда ресурс, которого ожидает задача, становится готовым к выполнению запрошенной операции. Если задача хочет читать данные из TCP-сокета, то мы хотим опрашивать задачу только тогда, когда TCP-сокет получил данные. В нашем случае задача ожидает достижения заданного Instant. В идеале mini-tokio должен опрашивать задачу только после того, как этот момент времени наступит.
Для достижения этого, когда ресурс опрашивается и ресурс не готов, ресурс отправит уведомление, когда перейдет в готовое состояние.
Wakers (Побудители)
Wakers — это недостающий элемент. Это система, с помощью которой ресурс может уведомлять ожидающую задачу о том, что ресурс стал готов к продолжению операции.
Давайте снова посмотрим на определение Future::poll:
#![allow(unused)] fn main() { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; }
Аргумент Context в poll имеет метод waker(). Этот метод возвращает Waker, связанный с текущей задачей. Waker имеет метод wake(). Вызов этого метода сигнализирует исполнителю, что связанная задача должна быть запланирована на выполнение. Ресурсы вызывают wake(), когда переходят в готовое состояние, чтобы уведомить исполнителя, что опрос задачи сможет продвинуться.
Обновление Delay
Мы можем обновить Delay для использования wakers:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::thread; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Получаем handle к waker текущей задачи let waker = cx.waker().clone(); let when = self.when; // Создаем поток таймера. thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } waker.wake(); }); Poll::Pending } } } }
Теперь, когда запрошенная длительность истекла, вызывающая задача уведомляется, и исполнитель может обеспечить повторное планирование задачи. Следующий шаг — обновить mini-tokio для прослушивания уведомлений пробуждения.
В нашей реализации Delay все еще осталось несколько проблем. Мы исправим их позже.
предупреждение Когда future возвращает
Poll::Pending, он должен обеспечить, чтобы waker был сигнализирован в какой-то момент. Забывание этого приводит к зависанию задачи на неопределенное время.Забывание разбудить задачу после возврата
Poll::Pendingявляется распространенным источником ошибок.
Вспомним первую итерацию Delay. Вот была реализация future:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; struct Delay { when: Instant } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Пока игнорируем эту строку. cx.waker().wake_by_ref(); Poll::Pending } } } }
Перед возвратом Poll::Pending мы вызвали cx.waker().wake_by_ref(). Это необходимо для соблюдения контракта future. Возвращая Poll::Pending, мы отвечаем за сигнализацию waker. Поскольку мы еще не реализовали поток таймера, мы сигнализировали waker немедленно. Это приведет к немедленному повторному планированию future, повторному выполнению и, вероятно, он еще не будет готов к завершению.
Обратите внимание, что разрешается сигнализировать waker чаще, чем необходимо. В этом конкретном случае мы сигнализируем waker, даже если совсем не готовы продолжить операцию. С этим нет ничего плохого, кроме некоторых потраченных впустую циклов CPU. Однако эта конкретная реализация приведет к активному циклу.
Обновление Mini Tokio
Следующий шаг — обновление Mini Tokio для получения уведомлений waker. Мы хотим, чтобы исполнитель запускал задачи только тогда, когда они пробуждаются, и для этого Mini Tokio будет предоставлять свой собственный waker. Когда waker вызывается, его связанная задача ставится в очередь на выполнение. Mini-Tokio передает этот waker future при его опросе.
Обновленный Mini Tokio будет использовать канал для хранения запланированных задач. Каналы позволяют задачам ставиться в очередь на выполнение из любого потока. Wakers должны быть Send и Sync.
информация Трейты
SendиSync— это маркерные трейты, связанные с параллелизмом, предоставляемые Rust. Типы, которые могут быть отправлены в другой поток, являютсяSend. Большинство типов являютсяSend, но, например,Rc— нет. Типы, к которым можно получить параллельный доступ через неизменяемые ссылки, являютсяSync. Тип может бытьSend, но неSync— хороший примерCell, который можно изменять через неизменяемую ссылку и, следовательно, небезопасно обращаться к нему параллельно.Для получения более подробной информации см. соответствующую главу в книге Rust.
Обновим структуру MiniTokio.
#![allow(unused)] fn main() { use std::sync::mpsc; use std::sync::Arc; struct MiniTokio { scheduled: mpsc::Receiver<Arc<Task>>, sender: mpsc::Sender<Arc<Task>>, } struct Task { // Это будет заполнено вскоре. } }
Wakers являются Sync и могут быть клонированы. Когда вызывается wake, задача должна быть запланирована на выполнение. Для реализации этого у нас есть канал. Когда wake() вызывается на waker, задача помещается в передающую половину канала. Наша структура Task будет реализовывать логику пробуждения. Для этого ей нужно содержать как созданный future, так и половину канала для отправки. Мы помещаем future в структуру TaskFuture вместе с Poll enum для отслеживания последнего результата Future::poll(), что необходимо для обработки ложных пробуждений. Более подробная информация приведена в реализации метода poll() в TaskFuture.
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::sync::mpsc; use std::task::Poll; use std::sync::{Arc, Mutex}; /// Структура, содержащая future и результат последнего вызова его метода `poll`. struct TaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, } struct Task { // `Mutex` нужен, чтобы `Task` реализовывал `Sync`. Только один поток обращается к `task_future` в любой given время. // `Mutex` не требуется для корректности. Настоящий Tokio не использует мьютекс здесь, но у настоящего Tokio больше строк кода, чем может поместиться на одной странице руководства. task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, } impl Task { fn schedule(self: &Arc<Self>) { self.executor.send(self.clone()); } } }
Чтобы запланировать задачу, Arc клонируется и отправляется через канал. Теперь нам нужно связать нашу функцию schedule с std::task::Waker. Стандартная библиотека предоставляет низкоуровневый API для этого с использованием ручного построения vtable. Эта стратегия обеспечивает максимальную гибкость для реализаторов, но требует кучу небезопасного шаблонного кода. Вместо использования RawWakerVTable напрямую, мы будем использовать утилиту ArcWake, предоставляемую крейтом futures. Это позволяет нам реализовать простой трейт, чтобы представить нашу структуру Task как waker.
Добавьте следующую зависимость в ваш Cargo.toml, чтобы подключить futures.
futures = "0.3"
Затем реализуйте futures::task::ArcWake.
#![allow(unused)] fn main() { use futures::task::{self, ArcWake}; use std::sync::Arc; struct Task {} impl Task { fn schedule(self: &Arc<Self>) {} } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { arc_self.schedule(); } } }
Когда поток таймера выше вызывает waker.wake(), задача помещается в канал. Далее мы реализуем получение и выполнение задач в функции MiniTokio::run().
#![allow(unused)] fn main() { use std::sync::mpsc; use futures::task::{self, ArcWake}; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; struct MiniTokio { scheduled: mpsc::Receiver<Arc<Task>>, sender: mpsc::Sender<Arc<Task>>, } struct TaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, } struct Task { task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) {} } impl MiniTokio { fn run(&self) { while let Ok(task) = self.scheduled.recv() { task.poll(); } } /// Инициализировать новый экземпляр mini-tokio. fn new() -> MiniTokio { let (sender, scheduled) = mpsc::channel(); MiniTokio { scheduled, sender } } /// Создать задачу в экземпляре mini-tokio. /// /// Данный future оборачивается в оболочку `Task` и помещается в очередь `scheduled`. /// Future будет выполнен при вызове `run`. fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { Task::spawn(future, &self.sender); } } impl TaskFuture { fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture { TaskFuture { future: Box::pin(future), poll: Poll::Pending, } } fn poll(&mut self, cx: &mut Context<'_>) { // Ложные пробуждения разрешены, даже после того, как future вернул `Ready`. Однако опрос future, который уже вернул `Ready`, *не* допускается. По этой причине нам нужно проверить, что future все еще находится в состоянии pending, прежде чем вызывать его. Невыполнение этого может привести к панике. if self.poll.is_pending() { self.poll = self.future.as_mut().poll(cx); } } } impl Task { fn poll(self: Arc<Self>) { // Создаем waker из экземпляра `Task`. Это использует реализацию `ArcWake` сверху. let waker = task::waker(self.clone()); let mut cx = Context::from_waker(&waker); // Никакой другой поток никогда не пытается заблокировать task_future let mut task_future = self.task_future.try_lock().unwrap(); // Опрашиваем внутренний future task_future.poll(&mut cx); } // Создает новую задачу с данным future. // // Инициализирует новую оболочку Task, содержащую данный future, и помещает ее в `sender`. Принимающая половина канала получит задачу и выполнит ее. fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { task_future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), }); let _ = sender.send(task); } } }
Здесь происходит несколько вещей. Во-первых, реализована MiniTokio::run(). Функция работает в цикле, получая запланированные задачи из канала. Когда задачи помещаются в канал при их пробуждении, эти задачи могут продвигаться при выполнении.
Кроме того, функции MiniTokio::new() и MiniTokio::spawn() adjusted для использования канала вместо VecDeque. Когда создаются новые задачи, им передается клон передающей части канала, который задача может использовать для планирования себя в среде выполнения.
Функция Task::poll() создает waker с помощью утилиты ArcWake из крейта futures. Waker используется для создания task::Context. Этот task::Context передается в poll.
Итог
Теперь мы увидели сквозной пример того, как работает асинхронный Rust. Функция async/await Rust основана на трейтах. Это позволяет сторонним крейтам, таким как Tokio, предоставлять детали выполнения.
- Асинхронные операции Rust ленивы и требуют, чтобы вызывающая сторона опрашивала их.
- Wakers передаются future для связи future с задачей, вызывающей его.
- Когда ресурс не готов завершить операцию, возвращается
Poll::Pendingи записывается waker задачи. - Когда ресурс становится готовым, waker задачи уведомляется.
- Исполнитель получает уведомление и планирует задачу на выполнение.
- Задача снова опрашивается, на этот раз ресурс готов и задача продвигается.
Несколько незавершенных моментов
Вспомним, когда мы реализовывали future Delay, мы сказали, что есть еще несколько вещей для исправления. Асинхронная модель Rust позволяет одному future мигрировать между задачами во время выполнения. Рассмотрим следующее:
use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let mut delay = Some(Delay { when }); poll_fn(move |cx| { let mut delay = delay.take().unwrap(); let res = Pin::new(&mut delay).poll(cx); assert!(res.is_pending()); tokio::spawn(async move { delay.await; }); Poll::Ready(()) }).await; }
Функция poll_fn создает экземпляр Future с помощью замыкания. Приведенный фрагмент создает экземпляр Delay, опрашивает его один раз, затем отправляет экземпляр Delay в новую задачу, где он ожидается. В этом примере Delay::poll вызывается более одного раза с разными экземплярами Waker. Когда это происходит, вы должны убедиться, что вызывается wake на Waker, переданном в самый последний вызов poll.
При реализации future критически важно предполагать, что каждый вызов poll может предоставить другой экземпляр Waker. Функция poll должна обновлять любой ранее записанный waker новым.
Наша предыдущая реализация Delay создавала новый поток каждый раз, когда он опрашивался. Это нормально, но может быть очень неэффективно, если он опрашивается слишком часто (например, если вы используете select! над этим future и другим future, оба опрашиваются, когда у любого из них есть событие). Один из подходов к этому — запомнить, создали ли вы уже поток, и создавать новый поток только если вы еще не создали его. Однако если вы делаете это, вы должны обеспечить, чтобы Waker потока обновлялся при последующих вызовах poll, иначе вы не будите самый последний Waker.
Чтобы исправить нашу предыдущую реализацию, мы могли бы сделать что-то вроде этого:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant}; struct Delay { when: Instant, // Это Some, когда мы создали поток, и None otherwise. waker: Option<Arc<Mutex<Waker>>>, } impl Future for Delay { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { // Проверяем текущий момент времени. Если длительность истекла, то этот future завершился, поэтому мы возвращаем `Poll::Ready`. if Instant::now() >= self.when { return Poll::Ready(()); } // Длительность не истекла. Если это первый вызов future, создаем поток таймера. Если поток таймера уже запущен, обеспечиваем, чтобы сохраненный `Waker` соответствовал waker текущей задачи. if let Some(waker) = &self.waker { let mut waker = waker.lock().unwrap(); // Проверяем, соответствует ли сохраненный waker waker текущей задачи. // Это необходимо, поскольку экземпляр future `Delay` может переместиться в другую задачу между вызовами `poll`. Если это происходит, waker, содержащийся в данном `Context`, будет отличаться, и мы должны обновить наш сохраненный waker, чтобы отразить это изменение. if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } } else { let when = self.when; let waker = Arc::new(Mutex::new(cx.waker().clone())); self.waker = Some(waker.clone()); // Это первый раз, когда вызывается `poll`, создаем поток таймера. thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } // Длительность истекла. Уведомляем вызывающую сторону, вызывая waker. let waker = waker.lock().unwrap(); waker.wake_by_ref(); }); } // К этому моменту waker сохранен и поток таймера запущен. // Длительность не истекла (напомним, что мы проверили это в первую очередь), следовательно, future не завершился, поэтому мы должны вернуть `Poll::Pending`. // // Контракт трейта `Future` требует, чтобы при возврате `Pending` future обеспечивал, чтобы данный waker был сигнализирован, когда future должен быть опрошен снова. В нашем случае, возвращая `Pending` здесь, мы обещаем, что мы вызовем данный waker, включенный в аргумент `Context`, once запрошенная длительность истечет. Мы обеспечиваем это, создавая поток таймера выше. // // Если мы забудем вызвать waker, задача зависнет на неопределенное время. Poll::Pending } } }
Это немного сложно, но идея в том, что при каждом вызове poll future проверяет, соответствует ли предоставленный waker ранее записанному waker. Если два waker совпадают, то больше ничего делать не нужно. Если они не совпадают, то записанный waker должен быть обновлен.
Утилита Notify
Мы продемонстрировали, как future Delay может быть реализован вручную с использованием wakers. Wakers являются основой того, как работает асинхронный Rust. Обычно нет необходимости опускаться до этого уровня. Например, в случае Delay мы могли бы реализовать его полностью с помощью async/await, используя утилиту tokio::sync::Notify. Эта утилита предоставляет базовый механизм уведомления задач. Она обрабатывает детали wakers, включая обеспечение того, что записанный waker соответствует текущей задаче.
Используя Notify, мы можем реализовать функцию delay с помощью async/await следующим образом:
#![allow(unused)] fn main() { use tokio::sync::Notify; use std::sync::Arc; use std::time::{Duration, Instant}; use std::thread; async fn delay(dur: Duration) { let when = Instant::now() + dur; let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } notify_clone.notify_one(); }); notify.notified().await; } }
Select
До сих пор, когда мы хотели добавить параллелизм в систему, мы создавали новую задачу. Теперь мы рассмотрим некоторые дополнительные способы параллельного выполнения асинхронного кода с Tokio.
tokio::select!
Макрос tokio::select! позволяет ожидать выполнения нескольких асинхронных вычислений и возвращает результат, когда завершается одно из вычислений.
Например:
use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); tokio::spawn(async { let _ = tx1.send("one"); }); tokio::spawn(async { let _ = tx2.send("two"); }); tokio::select! { val = rx1 => { println!("rx1 completed first with {:?}", val); } val = rx2 => { println!("rx2 completed first with {:?}", val); } } }
Используются два oneshot-канала. Любой из каналов может завершиться первым. Оператор select! ожидает оба канала и связывает val со значением, возвращенным задачей. Когда либо tx1, либо tx2 завершаются, выполняется соответствующий блок.
Ветка, которая не завершается, отбрасывается. В примере вычисление ожидает oneshot::Receiver для каждого канала. oneshot::Receiver для канала, который еще не завершился, отбрасывается.
Отмена
В асинхронном Rust отмена выполняется путем отбрасывания future. Вспомним из "Асинхронность в глубине", что асинхронные операции Rust реализуются с использованием future, и future ленивы. Операция продолжается только тогда, когда future опрашивается. Если future отбрасывается, операция не может продолжаться, потому что все связанное состояние отбрасывается.
Тем не менее, иногда асинхронная операция будет создавать фоновые задачи или запускать другие операции, которые выполняются в фоне. Например, в приведенном выше примере создается задача для отправки сообщения обратно. Обычно задача выполняет некоторые вычисления для генерации значения.
Future или другие типы могут реализовывать Drop для очистки фоновых ресурсов. oneshot::Receiver Tokio реализует Drop, отправляя уведомление о закрытии половине Sender. Половина отправителя может получить это уведомление и прервать выполняющуюся операцию, отбросив ее.
use tokio::sync::oneshot; async fn some_operation() -> String { // Вычисляем значение здесь "wut".to_string() } #[tokio::main] async fn main() { let (mut tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); tokio::spawn(async { // Выбираем между операцией и уведомлением // `closed()` oneshot-канала. tokio::select! { val = some_operation() => { let _ = tx1.send(val); } _ = tx1.closed() => { // `some_operation()` отменена, задача // завершается и `tx1` отбрасывается. } } }); tokio::spawn(async { let _ = tx2.send("two"); }); tokio::select! { val = rx1 => { println!("rx1 completed first with {:?}", val); } val = rx2 => { println!("rx2 completed first with {:?}", val); } } }
Реализация Future
Чтобы лучше понять, как работает select!, давайте посмотрим, как могла бы выглядеть гипотетическая реализация Future. Это упрощенная версия. На практике select! включает дополнительную функциональность, такую как случайный выбор ветки для опроса первой.
use tokio::sync::oneshot; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; struct MySelect { rx1: oneshot::Receiver<&'static str>, rx2: oneshot::Receiver<&'static str>, } impl Future for MySelect { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) { println!("rx1 completed first with {:?}", val); return Poll::Ready(()); } if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) { println!("rx2 completed first with {:?}", val); return Poll::Ready(()); } Poll::Pending } } #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); // используем tx1 и tx2 tx1.send("one").unwrap(); tx2.send("two").unwrap(); MySelect { rx1, rx2, }.await; }
Future MySelect содержит future из каждой ветки. Когда MySelect опрашивается, опрашивается первая ветка. Если она готова, значение используется и MySelect завершается. После того как .await получает вывод из future, future отбрасывается. Это приводит к отбрасыванию future для обеих веток. Поскольку одна ветка не завершилась, операция эффективно отменяется.
Помните из предыдущего раздела:
Когда future возвращает
Poll::Pending, он должен обеспечить, чтобы waker был сигнализирован в какой-то момент в будущем. Забывание этого приводит к зависанию задачи на неопределенное время.
В реализации MySelect нет явного использования аргумента Context. Вместо этого требование waker выполняется путем передачи cx внутренним future. Поскольку внутренний future также должен удовлетворять требованию waker, возвращая Poll::Pending только при получении Poll::Pending от внутреннего future, MySelect также удовлетворяет требованию waker.
Синтаксис
Макрос select! может обрабатывать более двух веток. Текущий предел - 64 ветки. Каждая ветка структурирована как:
<pattern> = <async expression> => <handler>,
Когда макрос select вычисляется, все <async expression> агрегируются и выполняются параллельно. Когда выражение завершается, результат сопоставляется с <pattern>. Если результат соответствует шаблону, то все оставшиеся асинхронные выражения отбрасываются и выполняется <handler>. Выражение <handler> имеет доступ к любым привязкам, установленным <pattern>.
Базовый случай для <pattern> - это имя переменной, результат асинхронного выражения привязывается к имени переменной, и <handler> имеет доступ к этой переменной. Вот почему в исходном примере val использовался для <pattern>, и <handler> мог получить доступ к val.
Если <pattern> не соответствует результату асинхронного вычисления, то оставшиеся асинхронные выражения продолжают выполняться параллельно до завершения следующего. В этот момент та же логика применяется к этому результату.
Поскольку select! принимает любое асинхронное выражение, можно определить более сложные вычисления для выбора.
Здесь мы выбираем между выводом oneshot-канала и TCP-соединением.
use tokio::net::TcpStream; use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel(); // Создаем задачу, которая отправляет сообщение через oneshot tokio::spawn(async move { tx.send("done").unwrap(); }); tokio::select! { socket = TcpStream::connect("localhost:3465") => { println!("Socket connected {:?}", socket); } msg = rx => { println!("received message first {:?}", msg); } } }
Здесь мы выбираем между oneshot и принятием сокетов из TcpListener.
use tokio::net::TcpListener; use tokio::sync::oneshot; use std::io; #[tokio::main] async fn main() -> io::Result<()> { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { tx.send(()).unwrap(); }); let mut listener = TcpListener::bind("localhost:3465").await?; tokio::select! { _ = async { loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { process(socket) }); } // Помогаем выводу типов Rust Ok::<_, io::Error>(()) } => {} _ = rx => { println!("terminating accept loop"); } } Ok(()) } async fn process(_: tokio::net::TcpStream) {}
Цикл принятия выполняется до тех пор, пока не встретится ошибка или rx не получит значение. Шаблон _ указывает, что мы не заинтересованы в возвращаемом значении асинхронного вычисления.
Возвращаемое значение
Макрос tokio::select! возвращает результат вычисленного выражения <handler>.
async fn computation1() -> String { // .. вычисление unimplemented!(); } async fn computation2() -> String { // .. вычисление unimplemented!(); } fn dox() { #[tokio::main] async fn main() { let out = tokio::select! { res1 = computation1() => res1, res2 = computation2() => res2, }; println!("Got = {}", out); } }
Из-за этого требуется, чтобы выражение <handler> для каждой ветки вычислялось в один и тот же тип. Если вывод выражения select! не нужен, хорошей практикой является вычисление выражения в ().
Ошибки
Использование оператора ? распространяет ошибку из выражения. Как это работает, зависит от того, используется ли ? из асинхронного выражения или из обработчика. Использование ? в асинхронном выражении распространяет ошибку из асинхронного выражения. Это делает вывод асинхронного выражения Result. Использование ? из обработчика немедленно распространяет ошибку из выражения select!. Давайте снова посмотрим на пример цикла принятия:
use tokio::net::TcpListener; use tokio::sync::oneshot; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // [настройка `rx` oneshot-канала] let (tx, rx) = oneshot::channel(); tx.send(()).unwrap(); let listener = TcpListener::bind("localhost:3465").await?; tokio::select! { res = async { loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { process(socket) }); } // Помогаем выводу типов Rust Ok::<_, io::Error>(()) } => { res?; } _ = rx => { println!("terminating accept loop"); } } Ok(()) } async fn process(_: tokio::net::TcpStream) {}
Обратите внимание на listener.accept().await?. Оператор ? распространяет ошибку из этого выражения и в привязку res. При ошибке res будет установлен в Err(_). Затем в обработчике оператор ? используется снова. Оператор res? будет распространять ошибку из функции main.
Сопоставление с образцом
Напомним, что синтаксис ветки макроса select! был определен как:
<pattern> = <async expression> => <handler>,
До сих пор мы использовали только привязки переменных для <pattern>. Однако можно использовать любой шаблон Rust. Например, скажем, мы получаем из нескольких MPSC-каналов, мы можем сделать что-то вроде этого:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (mut tx1, mut rx1) = mpsc::channel(128); let (mut tx2, mut rx2) = mpsc::channel(128); tokio::spawn(async move { // Делаем что-то с `tx1` и `tx2` tx1.send(1).await.unwrap(); tx2.send(2).await.unwrap(); }); tokio::select! { Some(v) = rx1.recv() => { println!("Got {:?} from rx1", v); } Some(v) = rx2.recv() => { println!("Got {:?} from rx2", v); } else => { println!("Both channels closed"); } } }
В этом примере выражение select! ожидает получения значения из rx1 и rx2. Если канал закрывается, recv() возвращает None. Это не соответствует шаблону, и ветка отключается. Выражение select! будет продолжать ожидать оставшиеся ветки.
Обратите внимание, что это выражение select! включает ветку else. Выражение select! должно вычисляться в значение. При использовании сопоставления с образцом возможно, что ни одна из веток не соответствует своим связанным шаблонам. Если это происходит, вычисляется ветка else.
Заимствование
При создании задач создаваемое асинхронное выражение должно владеть всеми своими данными. Макрос select! не имеет этого ограничения. Каждое асинхронное выражение ветки может заимствовать данные и работать параллельно. Следуя правилам заимствования Rust, несколько асинхронных выражений могут неизменяемо заимствовать один фрагмент данных или одно асинхронное выражение может изменяемо заимствовать фрагмент данных.
Давайте рассмотрим несколько примеров. Здесь мы одновременно отправляем одни и те же данные в два разных TCP-назначения.
use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use std::io; use std::net::SocketAddr; async fn race( data: &[u8], addr1: SocketAddr, addr2: SocketAddr ) -> io::Result<()> { tokio::select! { Ok(_) = async { let mut socket = TcpStream::connect(addr1).await?; socket.write_all(data).await?; Ok::<_, io::Error>(()) } => {} Ok(_) = async { let mut socket = TcpStream::connect(addr2).await?; socket.write_all(data).await?; Ok::<_, io::Error>(()) } => {} else => {} }; Ok(()) } fn main() {}
Переменная data заимствуется неизменяемо из обоих асинхронных выражений. Когда одна из операций завершается успешно, другая отбрасывается. Поскольку мы сопоставляем с образцом Ok(_), если выражение завершается неудачно, другое продолжает выполняться.
Когда дело доходит до <handler> каждой ветки, select! гарантирует, что выполняется только один <handler>. Из-за этого каждый <handler> может изменяемо заимствовать одни и те же данные.
Например, это изменяет out в обоих обработчиках:
use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let mut out = String::new(); tokio::spawn(async move { // Отправляем значения на `tx1` и `tx2`. let _ = tx1.send("one"); let _ = tx2.send("two"); }); tokio::select! { _ = rx1 => { out.push_str("rx1 completed"); } _ = rx2 => { out.push_str("rx2 completed"); } } println!("{}", out); }
Циклы
Макрос select! часто используется в циклах. Этот раздел пройдет через некоторые примеры, чтобы показать общие способы использования макроса select! в цикле. Мы начнем с выбора из нескольких каналов:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx1, mut rx1) = mpsc::channel(128); let (tx2, mut rx2) = mpsc::channel(128); let (tx3, mut rx3) = mpsc::channel(128); tx1.clone().send("hello").await.unwrap(); drop((tx1, tx2, tx3)); loop { let msg = tokio::select! { Some(msg) = rx1.recv() => msg, Some(msg) = rx2.recv() => msg, Some(msg) = rx3.recv() => msg, else => { break } }; println!("Got {:?}", msg); } println!("All channels have been closed."); }
Этот пример выбирает из трех получателей каналов. Когда сообщение получено на любом канале, оно записывается в STDOUT. Когда канал закрывается, recv() возвращает None. Используя сопоставление с образцом, макрос select! продолжает ожидать оставшиеся каналы. Когда все каналы закрыты, вычисляется ветка else, и цикл завершается.
Макрос select! случайным образом выбирает ветки для проверки готовности первой. Когда несколько каналов имеют ожидающие значения, случайный канал будет выбран для получения. Это нужно для обработки случая, когда цикл приема обрабатывает сообщения медленнее, чем они помещаются в каналы, что означает, что каналы начинают заполняться. Если бы select! не выбирал случайным образом ветку для проверки первой, на каждой итерации цикла rx1 проверялась бы первой. Если rx1 всегда содержал новое сообщение, оставшиеся каналы никогда не проверялись бы.
информация Если при вычислении
select!несколько каналов имеют ожидающие сообщения, только из одного канала извлекается значение. Все остальные каналы остаются нетронутыми, и их сообщения остаются в этих каналах до следующей итерации цикла. Ни одно сообщение не теряется.
Возобновление асинхронной операции
Теперь мы покажем, как запускать асинхронную операцию в нескольких вызовах select!. В этом примере у нас есть MPSC-канал с типом элемента i32 и асинхронная функция. Мы хотим запускать асинхронную функцию до тех пор, пока она не завершится или не будет получено четное целое число из канала.
async fn action() { // Некоторая асинхронная логика } #[tokio::main] async fn main() { let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); tokio::spawn(async move { let _ = tx.send(1).await; let _ = tx.send(2).await; }); let operation = action(); tokio::pin!(operation); loop { tokio::select! { _ = &mut operation => break, Some(v) = rx.recv() => { if v % 2 == 0 { break; } } } } }
Обратите внимание, что вместо вызова action() в макросе select!, он вызывается вне цикла. Возврат action() присваивается operation без вызова .await. Затем мы вызываем tokio::pin! на operation.
Внутри цикла select! вместо передачи operation мы передаем &mut operation. Переменная operation отслеживает выполняющуюся асинхронную операцию. Каждая итерация цикла использует ту же операцию вместо вызова нового вызова action().
Другая ветка select! получает сообщение из канала. Если сообщение четное, мы заканчиваем цикл. В противном случае снова запускаем select!.
Это первый раз, когда мы используем tokio::pin!. Мы не будем вдаваться в детали закрепления (pinning) сейчас. Важно отметить, что для .await ссылки значение, на которое ссылаются, должно быть закреплено или реализовывать Unpin.
Если мы удалим строку tokio::pin! и попытаемся скомпилировать, мы получим следующую ошибку:
error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`
Хотя мы рассматривали Future в предыдущей главе, эта ошибка все еще не очень понятна. Если вы столкнулись с такой ошибкой о том, что Future не реализован при попытке вызвать .await на ссылке, то future, вероятно, нужно закрепить.
Подробнее о Pin в стандартной библиотеке.
Изменение ветки
Давайте рассмотрим немного более сложный цикл. У нас есть:
- Канал значений
i32. - Асинхронная операция для выполнения над значениями
i32.
Логика, которую мы хотим реализовать:
- Ждать четное число в канале.
- Запустить асинхронную операцию, используя четное число в качестве ввода.
- Ждать операцию, но в то же время слушать больше четных чисел в канале.
- Если новое четное число получено до завершения существующей операции, прервать существующую операцию и начать ее заново с новым четным числом.
async fn action(input: Option<i32>) -> Option<String> { // Если ввод `None`, вернуть `None`. // Это также можно записать как `let i = input?;` let i = match input { Some(input) => input, None => return None, }; // асинхронная логика здесь Some(i.to_string()) } #[tokio::main] async fn main() { let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); let mut done = false; let operation = action(None); tokio::pin!(operation); tokio::spawn(async move { let _ = tx.send(1).await; let _ = tx.send(3).await; let _ = tx.send(2).await; }); loop { tokio::select! { res = &mut operation, if !done => { done = true; if let Some(v) = res { println!("GOT = {}", v); return; } } Some(v) = rx.recv() => { if v % 2 == 0 { // `.set` - метод на `Pin`. operation.set(action(Some(v))); done = false; } } } } }
Мы используем стратегию, аналогичную предыдущему примеру. Асинхронная функция вызывается вне цикла и присваивается operation. Переменная operation закрепляется. Цикл выбирает между operation и получателем канала.
Обратите внимание, как action принимает Option<i32> в качестве аргумента. Прежде чем мы получим первое четное число, нам нужно создать экземпляр operation во что-то. Мы заставляем action принимать Option и возвращать Option. Если передается None, возвращается None. Первая итерация цикла, operation немедленно завершается с None.
Этот пример использует некоторый новый синтаксис. Первая ветка включает , if !done. Это предусловие ветки. Прежде чем объяснять, как это работает, давайте посмотрим, что произойдет, если предусловие опущено. Если опустить , if !done и запустить пример, получится следующий вывод:
thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Эта ошибка происходит при попытке использовать operation после того, как она уже завершилась. Обычно при использовании .await значение, которое ожидается, потребляется. В этом примере мы ожидаем ссылку. Это означает, что operation все еще существует после ее завершения.
Чтобы избежать этой паники, мы должны позаботиться о том, чтобы отключить первую ветку, если operation завершилась. Переменная done используется для отслеживания, завершилась ли operation. Ветка select! может включать предусловие. Это предусловие проверяется до того, как select! ожидает ветку. Если условие вычисляется в false, то ветка отключается. Переменная done инициализируется в false. Когда operation завершается, done устанавливается в true. Следующая итерация цикла отключит ветку operation. Когда из канала получено четное сообщение, operation сбрасывается, и done устанавливается в false.
Параллелизм на уровне задачи
И tokio::spawn, и select! позволяют запускать параллельные асинхронные операции. Однако стратегия, используемая для запуска параллельных операций, отличается. Функция tokio::spawn принимает асинхронную операцию и создает новую задачу для ее выполнения. Задача - это объект, который планируется средой выполнения Tokio. Две разные задачи планируются Tokio независимо. Они могут выполняться одновременно в разных потоках операционной системы. Из-за этого созданная задача имеет те же ограничения, что и созданный поток: без заимствования.
Макрос select! запускает все ветки параллельно в одной и той же задаче. Поскольку все ветки макроса select! выполняются в одной и той же задаче, они никогда не будут выполняться одновременно. Макрос select! мультиплексирует асинхронные операции в одной задаче.
Streams
Поток (stream) - это асинхронная последовательность значений. Это асинхронный эквивалент Rust'ового std::iter::Iterator, представленный трейтом Stream. Потоки можно итерировать в async функциях. Они также могут быть преобразованы с помощью адаптеров. Tokio предоставляет ряд общих адаптеров в трейте StreamExt.
Tokio предоставляет поддержку потоков в отдельном крейте: tokio-stream.
tokio-stream = "0.1"
информация В настоящее время утилиты потоков Tokio находятся в крейте
tokio-stream. Когда трейтStreamстабилизируется в стандартной библиотеке Rust, утилиты потоков Tokio будут перемещены в крейтtokio.
Итерация
В настоящее время язык программирования Rust не поддерживает асинхронные циклы for. Вместо этого итерация потоков выполняется с использованием цикла while let в паре с StreamExt::next().
use tokio_stream::StreamExt; #[tokio::main] async fn main() { let mut stream = tokio_stream::iter(&[1, 2, 3]); while let Some(v) = stream.next().await { println!("GOT = {:?}", v); } }
Как и итераторы, метод next() возвращает Option<T>, где T - тип значения потока. Получение None указывает на завершение итерации потока.
Mini-Redis broadcast
Давайте рассмотрим немного более сложный пример с использованием клиента Mini-Redis.
Полный код можно найти здесь.
use tokio_stream::StreamExt; use mini_redis::client; async fn publish() -> mini_redis::Result<()> { let mut client = client::connect("127.0.0.1:6379").await?; // Публикуем некоторые данные client.publish("numbers", "1".into()).await?; client.publish("numbers", "two".into()).await?; client.publish("numbers", "3".into()).await?; client.publish("numbers", "four".into()).await?; client.publish("numbers", "five".into()).await?; client.publish("numbers", "6".into()).await?; Ok(()) } async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber.into_stream(); tokio::pin!(messages); while let Some(msg) = messages.next().await { println!("got = {:?}", msg); } Ok(()) } fn dox() { #[tokio::main] async fn main() -> mini_redis::Result<()> { tokio::spawn(async { publish().await }); subscribe().await?; println!("DONE"); Ok(()) } }
Создается задача для публикации сообщений на сервер Mini-Redis в канал "numbers". Затем в главной задаче мы подписываемся на канал "numbers" и отображаем полученные сообщения.
После подписки вызывается into_stream() на возвращенном подписчике. Это потребляет Subscriber, возвращая поток, который выдает сообщения по мере их поступления. Прежде чем мы начнем итерировать сообщения, обратите внимание, что поток закрепляется в стеке с помощью tokio::pin!. Вызов next() на потоке требует, чтобы поток был закреплен. Функция into_stream() возвращает поток, который не закреплен, мы должны явно закрепить его, чтобы итерировать.
информация Значение Rust "закрепляется", когда оно больше не может быть перемещено в памяти. Ключевое свойство закрепленного значения заключается в том, что можно брать указатели на закрепленные данные, и вызывающая сторона может быть уверена, что указатель останется действительным. Эта функция используется
async/awaitдля поддержки заимствования данных между точками.await.
Если мы забудем закрепить поток, мы получим ошибку типа:
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
Если вы столкнулись с таким сообщением об ошибке, попробуйте закрепить значение!
Прежде чем попытаться запустить это, запустите сервер Mini-Redis:
$ mini-redis-server
Затем попробуйте запустить код. Мы увидим сообщения, выведенные в STDOUT.
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
Некоторые ранние сообщения могут быть потеряны из-за гонки между подпиской и публикацией. Программа никогда не завершается. Подписка на канал Mini-Redis остается активной, пока активен сервер.
Давайте посмотрим, как мы можем работать с потоками, чтобы расширить эту программу.
Адаптеры
Функции, которые принимают Stream и возвращают другой Stream, часто называются 'адаптерами потоков', поскольку они являются формой 'шаблона адаптера'. Общие адаптеры потоков включают map, take и filter.
Давайте обновим Mini-Redis так, чтобы он завершался. После получения трех сообщений остановим итерацию сообщений. Это делается с помощью take. Этот адаптер ограничивает поток, чтобы он выдавал не более n сообщений.
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .take(3); Ok(()) } }
Запустив программу снова, мы получим:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
На этот раз программа завершается.
Теперь давайте ограничим поток однозначными числами. Мы проверим это, проверив длину сообщения. Мы используем адаптер filter, чтобы отбросить любое сообщение, которое не соответствует предикату.
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .take(3); Ok(()) } }
Запустив программу снова, мы получим:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
Обратите внимание, что порядок применения адаптеров имеет значение. Вызов filter сначала, затем take отличается от вызова take затем filter.
Наконец, мы приведем в порядок вывод, убрав часть Ok(Message { ... }) вывода. Это делается с помощью map. Поскольку это применяется после filter, мы знаем, что сообщение Ok, поэтому можем использовать unwrap().
#![allow(unused)] fn main() { use mini_redis::client; use tokio_stream::StreamExt; async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .map(|msg| msg.unwrap().content) .take(3); Ok(()) } }
Теперь вывод:
got = b"1"
got = b"3"
got = b"6"
Другой вариант - объединить шаги filter и map в один вызов с помощью filter_map.
Есть больше доступных адаптеров. См. список здесь.
Реализация Stream
Трейт Stream очень похож на трейт Future.
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } } }
Функция Stream::poll_next() очень похожа на Future::poll, за исключением того, что ее можно вызывать повторно для получения многих значений из потока. Как мы видели в Асинхронность в глубине, когда поток не готов вернуть значение, вместо этого возвращается Poll::Pending. Регистрируется waker задачи. Как только поток должен быть опрошен снова, waker уведомляется.
Метод size_hint() используется так же, как и с итераторами.
Обычно при ручной реализации Stream это делается путем комбинирования future и других потоков. В качестве примера давайте построим на основе future Delay, который мы реализовали в Асинхронность в глубине. Мы преобразуем его в поток, который выдает () три раза с интервалом 10 мс.
#![allow(unused)] fn main() { use tokio_stream::Stream; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::time::Instant; struct Interval { rem: usize, delay: Delay, } struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } impl Interval { fn new() -> Self { Self { rem: 3, delay: Delay { when: Instant::now() } } } } impl Stream for Interval { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { if self.rem == 0 { // Больше нет задержек return Poll::Ready(None); } match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { let when = self.delay.when + Duration::from_millis(10); self.delay = Delay { when }; self.rem -= 1; Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, } } } }
async-stream
Ручная реализация потоков с использованием трейта Stream может быть утомительной. К сожалению, язык программирования Rust еще не поддерживает синтаксис async/await для определения потоков. Это в работе, но еще не готово.
Крейт async-stream доступен как временное решение. Этот крейт предоставляет макрос stream!, который преобразует ввод в поток. Используя этот крейт, вышеуказанный интервал может быть реализован так:
#![allow(unused)] fn main() { use async_stream::stream; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tokio_stream::StreamExt; use std::time::{Duration, Instant}; struct Delay { when: Instant } impl Future for Delay { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } async fn dox() { let stream = stream! { let mut when = Instant::now(); for _ in 0..3 { let delay = Delay { when }; delay.await; yield (); when += Duration::from_millis(10); } } ; tokio::pin!(stream); while let Some(_) = stream.next().await { } } }
Темы
Этот раздел содержит самостоятельные статьи, связанные с различными темами, которые возникают при написании асинхронных приложений.
В настоящее время доступны следующие статьи по темам:
- Совместимость с синхронным кодом
- Корректное завершение работы
- Начало работы с Tracing
- Следующие шаги с Tracing
- Тестирование
Совместимость с синхронным кодом
В большинстве примеров использования Tokio мы помечаем главную функцию #[tokio::main] и делаем весь проект асинхронным.
В некоторых случаях может потребоваться запустить небольшую часть синхронного кода. Для получения дополнительной информации об этом см. spawn_blocking.
В других случаях может быть проще структурировать приложение как в основном синхронное, с меньшими или логически отдельными асинхронными частями. Например, приложение с графическим интерфейсом может захотеть запустить код GUI в основном потоке и запустить среду выполнения Tokio рядом с ним в другом потоке.
На этой странице объясняется, как можно изолировать async/await до небольшой части вашего проекта.
Во что раскрывается #[tokio::main]
Макрос #[tokio::main] - это макрос, который заменяет вашу главную функцию на не-асинхронную главную функцию, которая запускает среду выполнения и затем вызывает ваш код. Например, это:
#[tokio::main] async fn main() { println!("Hello world"); }
превращается в это:
fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { println!("Hello world"); }) }
макросом. Чтобы использовать async/await в наших собственных проектах, мы можем сделать что-то подобное, где мы используем метод block_on для входа в асинхронный контекст, где это уместно.
Синхронный интерфейс для mini-redis
В этом разделе мы рассмотрим, как построить синхронный интерфейс для mini-redis, сохраняя объект Runtime и используя его метод block_on. В следующих разделах мы обсудим некоторые альтернативные подходы и когда следует использовать каждый подход.
Интерфейс, который мы будем оборачивать, - это асинхронный тип Client. У него есть несколько методов, и мы реализуем блокирующую версию следующих методов:
Для этого мы вводим новый файл с именем src/clients/blocking_client.rs и инициализируем его структурой-оберткой вокруг асинхронного типа Client:
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::clients::client::Message;
/// Установленное соединение с сервером Redis.
pub struct BlockingClient {
/// Асинхронный `Client`.
inner: crate::clients::Client,
/// Среда выполнения `current_thread` для выполнения операций над
/// асинхронным клиентом в блокирующем режиме.
rt: Runtime,
}
impl BlockingClient {
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
// Вызываем асинхронный метод connect с использованием среды выполнения.
let inner = rt.block_on(crate::clients::Client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
}
Здесь мы включили функцию-конструктор как наш первый пример того, как выполнять асинхронные методы в не-асинхронном контексте. Мы делаем это с помощью метода block_on для типа Tokio Runtime, который выполняет асинхронный метод и возвращает его результат.
Важная деталь - использование среды выполнения current_thread. Обычно при использовании Tokio вы бы использовали среду выполнения по умолчанию multi_thread, которая создает кучу фоновых потоков, чтобы она могла эффективно запускать много вещей одновременно. Для нашего случая использования мы будем делать только одну вещь за раз, поэтому мы ничего не выиграем от запуска нескольких потоков. Это делает среду выполнения current_thread идеальным вариантом, поскольку она не создает никаких потоков.
Вызов enable_all включает драйверы IO и таймеров в среде выполнения Tokio. Если они не включены, среда выполнения не может выполнять IO или использовать таймеры.
предупреждение Поскольку среда выполнения
current_threadне создает потоки, она работает только тогда, когда вызываетсяblock_on. Как толькоblock_onвозвращает результат, все созданные задачи в этой среде выполнения заморозятся, пока вы снова не вызоветеblock_on. Используйте среду выполненияmulti_threaded, если созданные задачи должны продолжать работать, когдаblock_onне вызывается.
Как только у нас есть эта структура, большинство методов легко реализовать:
use bytes::Bytes;
use std::time::Duration;
impl BlockingClient {
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
self.rt.block_on(self.inner.get(key))
}
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.rt.block_on(self.inner.set(key, value))
}
pub fn set_expires(
&mut self,
key: &str,
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.rt.block_on(self.inner.set_expires(key, value, expiration))
}
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.rt.block_on(self.inner.publish(channel, message))
}
}
Метод Client::subscribe более интересен, потому что он преобразует Client в объект Subscriber. Мы можем реализовать его следующим образом:
/// Клиент, который вошел в режим pub/sub.
///
/// Как только клиенты подписываются на канал, они могут выполнять только
/// команды, связанные с pub/sub. Тип `BlockingClient` преобразуется
/// в тип `BlockingSubscriber`, чтобы предотвратить вызов
/// методов, не связанных с pub/sub.
pub struct BlockingSubscriber {
/// Асинхронный `Subscriber`.
inner: crate::clients::Subscriber,
/// Среда выполнения `current_thread` для выполнения операций над
/// асинхронным клиентом в блокирующем режиме.
rt: Runtime,
}
impl BlockingClient {
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
Ok(BlockingSubscriber {
inner: subscriber,
rt: self.rt,
})
}
}
impl BlockingSubscriber {
pub fn get_subscribed(&self) -> &[String] {
self.inner.get_subscribed()
}
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
self.rt.block_on(self.inner.next_message())
}
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.subscribe(channels))
}
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.unsubscribe(channels))
}
}
Итак, метод subscribe сначала использует среду выполнения для преобразования асинхронного Client в асинхронный Subscriber. Затем он сохранит полученный Subscriber вместе со средой выполнения Runtime и реализует различные методы с помощью block_on.
Обратите внимание, что асинхронная структура Subscriber имеет не-асинхронный метод get_subscribed. Чтобы обработать это, мы просто вызываем его напрямую без участия среды выполнения.
Другие подходы
Вышеуказанный раздел объясняет самый простой способ реализации синхронной обертки, но это не единственный способ. Подходы таковы:
- Создать
Runtimeи вызватьblock_onна асинхронном коде. - Создать
Runtimeиspawnзадачи на нем. - Запустить
Runtimeв отдельном потоке и отправлять ему сообщения.
Мы уже видели первый подход. Два других подхода описаны ниже.
Создание задач на среде выполнения
Объект Runtime имеет метод spawn. Когда вы вызываете этот метод, вы создаете новую фоновую задачу для выполнения в среде выполнения. Например:
use tokio::runtime::Builder; use tokio::time::{sleep, Duration}; fn main() { let runtime = Builder::new_multi_thread() .worker_threads(1) .enable_all() .build() .unwrap(); let mut handles = Vec::with_capacity(10); for i in 0..10 { handles.push(runtime.spawn(my_bg_task(i))); } // Делаем что-то времязатратное, пока выполняются фоновые задачи. std::thread::sleep(Duration::from_millis(750)); println!("Finished time-consuming task."); // Ждем завершения всех задач. for handle in handles { // Метод `spawn` возвращает `JoinHandle`. `JoinHandle` - это // future, поэтому мы можем ждать его с помощью `block_on`. runtime.block_on(handle).unwrap(); } } async fn my_bg_task(i: u64) { // Вычитая, задачи с большими значениями i спят в течение // более короткого времени. let millis = 1000 - 50 * i; println!("Task {} sleeping for {} ms.", i, millis); sleep(Duration::from_millis(millis)).await; println!("Task {} stopping.", i); }
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.
В приведенном выше примере мы создаем 10 фоновых задач в среде выполнения, затем ждем их все. В качестве примера, это может быть хорошим способом реализации фоновых сетевых запросов в графическом приложении, потому что сетевые запросы слишком времязатратны, чтобы запускать их в основном потоке GUI. Вместо этого вы создаете запрос в среде выполнения Tokio, работающей в фоне, и заставляете задачу отправлять информацию обратно в код GUI, когда запрос завершен, или даже постепенно, если вы хотите индикатор выполнения.
В этом примере важно, что среда выполнения настроена как среда выполнения multi_thread. Если вы измените ее на среду выполнения current_thread, вы обнаружите, что времязатратная задача завершается до того, как любая из фоновых задач начнется. Это потому, что фоновые задачи, созданные в среде выполнения current_thread, будут выполняться только во время вызовов block_on, так как в противном случае среда выполнения не имеет места для их запуска.
Пример ждет завершения созданных задач, вызывая block_on на JoinHandle, возвращенном вызовом spawn, но это не единственный способ сделать это. Вот некоторые альтернативы:
- Использовать канал передачи сообщений, такой как
tokio::sync::mpsc. - Изменять общее значение, защищенное, например,
Mutex. Это может быть хорошим подходом для индикатора выполнения в GUI, где GUI читает общее значение каждый кадр.
Метод spawn также доступен для типа Handle. Тип Handle можно клонировать, чтобы получить много handles к среде выполнения, и каждый Handle можно использовать для создания новых задач в среде выполнения.
Отправка сообщений
Третий метод - создать среду выполнения и использовать передачу сообщений для общения с ней. Это включает немного больше шаблонного кода, чем два других подхода, но это самый гибкий подход. Вы можете найти базовый пример ниже:
#![allow(unused)] fn main() { use tokio::runtime::Builder; use tokio::sync::mpsc; pub struct Task { name: String, // информация, описывающая задачу } async fn handle_task(task: Task) { println!("Got task {}", task.name); } #[derive(Clone)] pub struct TaskSpawner { spawn: mpsc::Sender<Task>, } impl TaskSpawner { pub fn new() -> TaskSpawner { // Настраиваем канал для общения. let (send, mut recv) = mpsc::channel(16); // Строим среду выполнения для нового потока. // // Среда выполнения создается перед созданием потока // для более чистого перенаправления ошибок, если `unwrap()` // паникует. let rt = Builder::new_current_thread() .enable_all() .build() .unwrap(); std::thread::spawn(move || { rt.block_on(async move { while let Some(task) = recv.recv().await { tokio::spawn(handle_task(task)); } // Как только все отправители выйдут из области видимости, // вызов `.recv()` вернет None, и он выйдет из // цикла while и завершит поток. }); }); TaskSpawner { spawn: send, } } pub fn spawn_task(&self, task: Task) { match self.spawn.blocking_send(task) { Ok(()) => {}, Err(_) => panic!("The shared runtime has shut down."), } } } }
Этот пример можно настроить многими способами. Например, вы можете использовать Semaphore для ограничения количества активных задач, или вы можете использовать канал в обратном направлении для отправки ответа создателю. Когда вы создаете среду выполнения таким образом, это тип [актора].
Корректное завершение работы
Цель этой страницы - дать обзор того, как правильно реализовать завершение работы в асинхронных приложениях.
Обычно есть три части реализации корректного завершения работы:
- Определение момента для завершения работы.
- Уведомление каждой части программы о необходимости завершения работы.
- Ожидание завершения работы других частей программы.
Остальная часть статьи пройдет через эти части. Реальная реализация подхода, описанного здесь, может быть найдена в mini-redis, в частности в файлах src/server.rs и src/shutdown.rs.
Определение момента для завершения работы
Это, конечно, будет зависеть от приложения, но одним очень распространенным критерием завершения работы является получение приложением сигнала от операционной системы. Это происходит, например, когда вы нажимаете ctrl+c в терминале во время работы программы. Чтобы обнаружить это, Tokio предоставляет функцию tokio::signal::ctrl_c, которая будет ждать до получения такого сигнала. Вы можете использовать ее так:
use tokio::signal;
#[tokio::main]
async fn main() {
// ... создаем приложение как отдельную задачу ...
match signal::ctrl_c().await {
Ok(()) => {},
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
// мы также завершаем работу в случае ошибки
},
}
// отправляем сигнал завершения работы приложению и ждем
}
Если у вас есть несколько условий завершения работы, вы можете использовать mpsc канал для отправки сигнала завершения работы в одно место. Затем вы можете использовать select на ctrl_c и канале. Например:
use tokio::signal;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
// ... создаем приложение как отдельную задачу ...
//
// приложение использует shutdown_send в случае, если завершение работы было инициировано изнутри
// приложения
tokio::select! {
_ = signal::ctrl_c() => {},
_ = shutdown_recv.recv() => {},
}
// отправляем сигнал завершения работы приложению и ждем
}
Уведомление о необходимости завершения работы
Когда вы хотите сообщить одной или нескольким задачам о завершении работы, вы можете использовать [Токены отмены][cancellation-tokens]. Эти токены позволяют уведомлять задачи, что они должны завершиться в ответ на запрос отмены, что упрощает реализацию корректного завершения работы.
Чтобы разделить CancellationToken между несколькими задачами, вы должны клонировать его. Это связано с правилом единоличного владения, которое требует, чтобы каждое значение имело единственного владельца. При клонировании токена вы получаете другой токен, неотличимый от оригинала; если один отменен, то другой также отменен. Вы можете сделать столько клонов, сколько нужно, и когда вы вызываете cancel на одном из них, все они отменяются.
Вот шаги для использования CancellationToken в нескольких задачах:
- Сначала создайте новый
CancellationToken. - Затем создайте клон оригинального
CancellationToken, вызвав методcloneна оригинальном токене. Это создаст новый токен, который может использоваться другой задачей. - Передайте оригинальный или клонированный токен задачам, которые должны реагировать на запросы отмены.
- Когда вы хотите корректно завершить задачи, вызовите метод
cancelна оригинальном или клонированном токене. Любая задача, слушающая запрос отмены на оригинальном или клонированном токене, будет уведомлена о завершении работы.
Вот фрагмент кода, демонстрирующий вышеупомянутые шаги:
// Шаг 1: Создаем новый CancellationToken
let token = CancellationToken::new();
// Шаг 2: Клонируем токен для использования в другой задаче
let cloned_token = token.clone();
// Задача 1 - Ждем отмены токена или долгого времени
let task1_handle = tokio::spawn(async move {
tokio::select! {
// Шаг 3: Используем клонированный токен для прослушивания запросов отмены
_ = cloned_token.cancelled() => {
// Токен был отменен, задача может завершиться
}
_ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
// Долгая работа завершена
}
}
});
// Задача 2 - Отменяем оригинальный токен после небольшой задержки
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// Шаг 4: Отменяем оригинальный или клонированный токен, чтобы уведомить другие задачи о корректном завершении работы
token.cancel();
});
// Ждем завершения задач
task1_handle.await.unwrap()
С Токенами отмены вам не нужно немедленно завершать задачу при отмене токена. Вместо этого вы можете выполнить процедуру завершения работы перед завершением задачи, такую как сброс данных в файл или базу данных, или отправка сообщения о завершении работы по соединению.
Ожидание завершения работы
После того как вы сообщили другим задачам о завершении работы, вам нужно будет дождаться их завершения. Один простой способ сделать это - использовать трекер задач. Трекер задач - это коллекция задач. Метод wait трекера задач дает вам future, который разрешается только после того, как все содержащиеся в нем future разрешатся и трекер задач будет закрыт.
Следующий пример создаст 10 задач, затем использует трекер задач для ожидания их завершения.
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::task::TaskTracker;
#[tokio::main]
async fn main() {
let tracker = TaskTracker::new();
for i in 0..10 {
tracker.spawn(some_operation(i));
}
// Как только мы создали все, мы закрываем трекер.
tracker.close();
// Ждем завершения всего.
tracker.wait().await;
println!("Это печатается после всех задач.");
}
async fn some_operation(i: u64) {
sleep(Duration::from_millis(100 * i)).await;
println!("Задача {} завершается.", i);
}
Начало работы с Tracing
Крейт
tracing- это фреймворк для инструментирования программ на Rust с целью сбора структурированной, событийно-ориентированной диагностической информации.
В асинхронных системах, таких как Tokio, интерпретация традиционных лог-сообщений часто может быть довольно сложной. Поскольку отдельные задачи мультиплексируются в одном потоке, связанные события и строки логов перемешиваются, что затрудняет отслеживание потока логики. tracing расширяет диагностику в стиле логирования, позволяя библиотекам и приложениям записывать структурированные события с дополнительной информацией о временности и причинности — в отличие от лог-сообщения, Span в tracing имеет время начала и окончания, может входить и выходить из потока выполнения и может существовать внутри вложенного дерева подобных спэнов. Для представления вещей, которые происходят в единственный момент времени, tracing предоставляет дополнительную концепцию событий. И Span, и Event являются структурированными, с возможностью записывать типизированные данные, а также текстовые сообщения.
Вы можете использовать tracing для:
- отправки распределенных трассировок в коллектор OpenTelemetry
- отладки вашего приложения с помощью Tokio Console
- логирования в
stdout, лог-файл илиjournald - профилирования того, где ваше приложение проводит время
Настройка
Для начала добавьте tracing и tracing-subscriber как зависимости:
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3"
Крейт tracing предоставляет API, который мы будем использовать для отправки трассировок. Крейт tracing-subscriber предоставляет некоторые базовые утилиты для перенаправления этих трассировок внешним слушателям (например, stdout).
Подписка на трассировки
Если вы создаете исполняемый файл (в отличие от библиотеки), вам нужно зарегистрировать tracing подписчика. Подписчики - это типы, которые обрабатывают трассировки, отправляемые вашим приложением и его зависимостями, и могут выполнять такие задачи, как вычисление метрик, мониторинг ошибок и повторная отправка трассировок во внешний мир (например, в journald, stdout или демон open-telemetry).
В большинстве случаев вы должны зарегистрировать свой tracing подписчик как можно раньше в вашей функции main. Например, тип FmtSubscriber, предоставляемый tracing-subscriber, который печатает форматированные трассировки и события в stdout, может быть зарегистрирован так:
mod mini_redis { pub type Error = Box<dyn std::error::Error + Send + Sync>; pub type Result<T> = std::result::Result<T, Error>; } #[tokio::main] pub async fn main() -> mini_redis::Result<()> { // создаем подписчика, который печатает форматированные трассировки в stdout let subscriber = tracing_subscriber::FmtSubscriber::new(); // используем этого подписчика для обработки трассировок, отправленных после этого момента tracing::subscriber::set_global_default(subscriber)?; /* ... */ Ok(()) }
Если вы запустите ваше приложение сейчас, вы можете увидеть некоторые события трассировки, отправленные Tokio, но вам нужно будет изменить ваше собственное приложение для отправки трассировок, чтобы получить максимальную отдачу от tracing.
Конфигурация подписчика
В приведенном выше примере мы настроили FmtSubscriber с его конфигурацией по умолчанию. Однако tracing-subscriber также предоставляет ряд способов настройки поведения FmtSubscriber, таких как настройка формата вывода, включение дополнительной информации (такой как идентификаторы потоков или местоположения исходного кода) в логи и запись логов куда-то кроме stdout.
Например:
#![allow(unused)] fn main() { // Начинаем настраивать подписчика `fmt` let subscriber = tracing_subscriber::fmt() // Используем более компактный, сокращенный формат лога .compact() // Отображаем пути к файлам исходного кода .with_file(true) // Отображаем номера строк исходного кода .with_line_number(true) // Отображаем идентификатор потока, на котором было записано событие .with_thread_ids(true) // Не отображаем цель события (путь модуля) .with_target(false) // Собираем подписчика .finish(); }
Для деталей о доступных опциях конфигурации см. документацию tracing_subscriber::fmt.
В дополнение к типу FmtSubscriber из tracing-subscriber, другие Subscriber могут реализовывать свои собственные способы записи данных tracing. Это включает альтернативные форматы вывода, анализ и агрегацию, а также интеграцию с другими системами, такими как распределенная трассировка или сервисы агрегации логов. Ряд крейтов предоставляет дополнительные реализации Subscriber, которые могут быть интересны. См. здесь для (неполного) списка дополнительных реализаций Subscriber.
Наконец, в некоторых случаях может быть полезно объединить несколько различных способов записи трассировок вместе, чтобы построить единственный Subscriber, который реализует несколько поведений. Для этой цели крейт tracing-subscriber предоставляет трейт Layer, который представляет компонент, который может быть скомпонован вместе с другими Layer для формирования Subscriber. См. здесь для деталей об использовании Layer.
Отправка спэнов и событий
Самый простой способ отправлять спэны - с помощью прока-макрос аннотации instrument, предоставляемой tracing, которая переписывает тела функций для отправки спэнов каждый раз, когда они вызываются; например:
#![allow(unused)] fn main() { #[tracing::instrument] fn trace_me(a: u32, b: u32) -> u32 { a + b } }
Каждый вызов trace_me будет отправлять tracing Span, который:
- имеет уровень важности
info("золотая середина" важности), - назван
trace_me, - имеет поля
aиb, чьи значения являются аргументамиtrace_me
Атрибут instrument высоко настраиваем; например, для трассировки метода в mini-redis-server, который обрабатывает каждое соединение:
#![allow(unused)] fn main() { struct Handler { connection: tokio::net::TcpStream, } pub type Error = Box<dyn std::error::Error + Send + Sync>; pub type Result<T> = std::result::Result<T, Error>; use tracing::instrument; impl Handler { /// Обработать одно соединение. #[instrument( name = "Handler::run", skip(self), fields( // `%` сериализует IP-адрес пира с помощью `Display` peer_addr = %self.connection.peer_addr().unwrap() ), )] async fn run(&mut self) -> mini_redis::Result<()> { /* ... */ Ok::<_, _>(()) } } }
mini-redis-server теперь будет отправлять tracing Span для каждого входящего соединения, который:
- имеет уровень важности
info("золотая середина" важности), - назван
Handler::run, - имеет некоторые структурированные данные, связанные с ним.
fields(...)указывает, что отправляемый спэн должен включать представлениеfmt::DisplaySocketAddrсоединения в поле под названиемpeer_addr.skip(self)указывает, что отправляемый спэн не должен записывать отладочное представлениеHandler.
Вы также можете построить Span вручную, вызвав макрос span! или любой из его уровневых сокращений (error_span!, warn_span!, info_span!, debug_span!, trace_span!).
Чтобы отправлять события, вызовите макрос event! или любой из его уровневых сокращений (error!, warn!, info!, debug!, trace!). Например, чтобы залогировать, что клиент отправил некорректную команду:
#![allow(unused)] fn main() { type Error = Box<dyn std::error::Error + Send + Sync>; type Result<T> = std::result::Result<T, Error>; struct Command; impl Command { fn from_frame<T>(frame: T) -> Result<()> { Result::Ok(()) } fn from_error<T>(err: T) {} } let frame = (); // Преобразуем redis frame в структуру команды. Это возвращает // ошибку, если frame не является валидной redis командой. let cmd = match Command::from_frame(frame) { Ok(cmd) => cmd, Err(cause) => { // Frame был некорректным и не мог быть разобран. Это // вероятно указывает на проблему с клиентом (в отличие // от нашего сервера), поэтому мы (1) отправляем предупреждение... // // Синтаксис здесь - это сокращение, предоставляемое крейтом // `tracing`. Это можно рассматривать как аналогичное: // tracing::warn! { // cause = format!("{}", cause), // "failed to parse command from frame" // }; // `tracing` предоставляет структурированное логирование, поэтому информация // "логируется" как пары ключ-значение. tracing::warn! { %cause, "failed to parse command from frame" }; // ...и (2) отвечаем клиенту с ошибкой: Command::from_error(cause) } }; }
Если вы запустите ваше приложение, вы теперь будете видеть события, украшенные контекстом их спэна, отправленные для каждого входящего соединения, которое оно обрабатывает.
Следующие шаги с Tracing
Tokio-console
tokio-console - это утилита, похожая на htop, которая позволяет вам видеть представление в реальном времени спэнов (span) и событий приложения. Она также может представлять "ресурсы", которые создала среда выполнения Tokio, такие как Задачи (Tasks). Это важно для понимания проблем производительности в процессе разработки.
Например, чтобы использовать tokio-console в проекте mini-redis, вам нужно включить функцию tracing для пакета Tokio:
# Обновите импорт tokio в вашем Cargo.toml
tokio = { version = "1", features = ["full", "tracing"] }
Примечание: Функция full не включает tracing.
Вам также нужно добавить зависимость от пакета console-subscriber. Этот крейт предоставляет реализацию Subscriber, которая заменит текущую, используемую mini-redis:
# Добавьте это в раздел dependencies вашего Cargo.toml
console-subscriber = "0.1.5"
Наконец, в src/bin/server.rs замените вызов tracing_subscriber на вызов console-subscriber:
Замените это:
#![allow(unused)] fn main() { use std::error::Error; tracing_subscriber::fmt::try_init()?; Ok::<(), Box<dyn Error + Send + Sync + 'static>>(()) }
...на это:
#![allow(unused)] fn main() { console_subscriber::init(); }
Это включит console_subscriber, что означает, что любая инструментация, относящаяся к tokio-console, будет записываться. Логирование в stdout все равно будет происходить (на основе значения переменной окружения RUST_LOG).
Теперь мы должны быть готовы снова запустить mini-redis, на этот раз используя флаг tokio_unstable (который необходим для включения tracing):
RUSTFLAGS="--cfg tokio_unstable" cargo run --bin mini-redis-server
Флаг tokio_unstable позволяет нам использовать дополнительные API, предоставляемые Tokio, которые в настоящее время не имеют гарантии стабильности (другими словами, для этих API допускаются критические изменения).
Осталось только запустить саму консоль в другом терминале. Самый простой способ сделать это - установить ее из crates.io:
cargo install --locked tokio-console
и затем запустить ее с помощью:
tokio-console
Первоначальный вид, который вы увидите, - это Задачи (Tasks) Tokio, которые в настоящее время выполняются.
Пример: 
Она также может показывать Задачи в течение некоторого времени после их завершения (цвет для них будет серым). Вы можете сгенерировать некоторые трассировки, запустив пример hello world для mini-redis (это доступно в репозитории mini-redis):
cargo run --example hello_world
Если вы нажмете r, вы можете переключиться на представление Ресурсов (Resources). Это отображает семафоры, мьютексы и другие конструкции, которые используются средой выполнения Tokio.
Пример: 
Всякий раз, когда вам нужно проанализировать среду выполнения Tokio, чтобы лучше понять производительность вашего приложения, вы можете использовать tokio-console для просмотра того, что происходит в реальном времени, помогая вам обнаружить взаимные блокировки и другие проблемы.
Чтобы узнать больше о том, как использовать tokio-console, посетите ее страницу документации.
Интеграция с OpenTelemetry
OpenTelemetry (OTel) означает несколько вещей; во-первых, это открытая спецификация, определяющая модель данных для трассировок (traces) и метрик (metrics), которая может удовлетворить потребности большинства пользователей. Это также набор SDK для конкретных языков, предоставляющих инструментацию, чтобы трассировки и метрики могли отправляться из приложения. В-третьих, есть OpenTelemetry Collector - бинарный файл, который работает вместе с вашим приложением для сбора трассировок и метрик, в конечном итоге отправляя их поставщику телеметрии, такому как DataDog, Honeycomb или AWS X-Ray. Он также может отправлять данные в такие инструменты, как Prometheus.
Крейт opentelemetry предоставляет SDK OpenTelemetry для Rust, и мы будем использовать его для этого руководства.
В этом руководстве мы настроим mini-redis для отправки данных в Jaeger, который представляет собой пользовательский интерфейс для визуализации трассировок.
Чтобы запустить экземпляр Jaeger, вы можете использовать Docker:
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
Вы можете посетить страницу Jaeger, перейдя по адресу http://localhost:16686.
Она будет выглядеть так:

Мы вернемся к этой странице, как только сгенерируем и отправим некоторые данные трассировки.
Чтобы настроить mini-redis, нам сначала нужно добавить несколько зависимостей. Обновите ваш Cargo.toml следующим образом:
# Реализует типы, определенные в спецификации Otel
opentelemetry = "0.17.0"
# Интеграция между крейтом tracing и крейтом opentelemetry
tracing-opentelemetry = "0.17.2"
# Позволяет экспортировать данные в Jaeger
opentelemetry-jaeger = "0.16.0"
Теперь в src/bin/server.rs добавьте следующие импорты:
#![allow(unused)] fn main() { use opentelemetry::global; use tracing_subscriber::{ fmt, layer::SubscriberExt, util::SubscriberInitExt, }; }
Мы рассмотрим, что делает каждый из них, через мгновение.
Следующий шаг - заменить вызов tracing_subscriber на настройку OTel.
Замените это:
#![allow(unused)] fn main() { use std::error::Error; tracing_subscriber::fmt::try_init()?; Ok::<(), Box<dyn Error + Send + Sync + 'static>>(()) }
...на это:
#![allow(unused)] fn main() { use std::error::Error; use opentelemetry::global; use tracing_subscriber::{ fmt, layer::SubscriberExt, util::SubscriberInitExt, }; // Позволяет передавать контекст (т.е., идентификаторы трассировок) между сервисами global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); // Настраивает механизм, необходимый для экспорта данных в Jaeger // Есть другие крейты OTel, которые предоставляют конвейеры для поставщиков, // упомянутых ранее. let tracer = opentelemetry_jaeger::new_pipeline() .with_service_name("mini-redis") .install_simple()?; // Создаем слой tracing с настроенным трассировщиком let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); // Трейты SubscriberExt и SubscriberInitExt необходимы для расширения // Registry для принятия `opentelemetry` (типа OpenTelemetryLayer). tracing_subscriber::registry() .with(opentelemetry) // Продолжаем логировать в stdout .with(fmt::Layer::default()) .try_init()?; Ok::<(), Box<dyn Error + Send + Sync + 'static>>(()) }
Теперь вы должны быть able to запустить mini-redis:
cargo run --bin mini-redis-server
В другом терминале запустите пример hello world (это доступно в репозитории mini-redis):
cargo run --example hello_world
Теперь обновите пользовательский интерфейс Jaeger, который у нас был открыт, и на главной странице поиска найдите "mini-redis" как один из вариантов в выпадающем списке Service.
Выберите этот вариант и нажмите кнопку "Find Traces". Это должно показать запрос, который мы только что сделали, запустив пример.

Нажатие на трассировку должно показать вам детальное представление спэнов, которые были отправлены во время обработки примера hello world.

На этом пока все! Вы можете исследовать это дальше, отправляя больше запросов, добавляя дополнительную инструментацию для mini-redis или настраивая OTel с поставщиком телеметрии (вместо экземпляра Jaeger, который мы запускаем локально). Для последнего вам может понадобиться подключить дополнительный крейт (например, для отправки данных в OTel Collector вам понадобится крейт opentelemetry-otlp). Есть много примеров, доступных в репозитории opentelemetry-rust.
Примечание: Репозиторий mini-redis уже содержит полный пример OpenTelemetry с AWS X-Ray, подробности которого можно найти в README, а также в файлах Cargo.toml и src/bin/server.rs.
Тестирование
Цель этой страницы - дать рекомендации по написанию полезных модульных тестов в асинхронных приложениях.
Приостановка и возобновление времени в тестах
Иногда асинхронный код явно ожидает, вызывая tokio::time::sleep или ожидая tokio::time::Interval::tick. Тестирование поведения, основанного на времени (например, экспоненциальной отсрочки), может стать громоздким, когда модульный тест начинает работать очень медленно. Однако внутренне функциональность, связанная со временем в tokio, поддерживает приостановку и возобновление времени. Приостановка времени имеет эффект, что любой future, связанный со временем, может стать готовым досрочно. Условием досрочного разрешения future, связанного со временем, является то, что больше нет других future, которые могут стать готовыми. Это по сути перематывает время вперед, когда единственный ожидаемый future связан со временем:
#![allow(unused)] fn main() { #[tokio::test] async fn paused_time() { tokio::time::pause(); let start = std::time::Instant::now(); tokio::time::sleep(Duration::from_millis(500)).await; println!("{:?}ms", start.elapsed().as_millis()); } }
Этот код выводит 0ms на разумной машине.
Для модульных тестов часто полезно запускать с приостановленным временем на протяжении всего теста. Это можно достичь, просто установив аргумент макроса start_paused в true:
#![allow(unused)] fn main() { #[tokio::test(start_paused = true)] async fn paused_time() { let start = std::time::Instant::now(); tokio::time::sleep(Duration::from_millis(500)).await; println!("{:?}ms", start.elapsed().as_millis()); } }
Имейте в виду, что атрибут start_paused требует функции tokio test-util.
Смотрите tokio::test "Configure the runtime to start with time paused" для более подробной информации.
Конечно, временной порядок разрешения future сохраняется, даже при использовании разных future, связанных со временем:
#![allow(unused)] fn main() { #[tokio::test(start_paused = true)] async fn interval_with_paused_time() { let mut interval = interval(Duration::from_millis(300)); let _ = timeout(Duration::from_secs(1), async move { loop { interval.tick().await; println!("Tick!"); } }) .await; } }
Этот код немедленно выводит "Tick!" ровно 4 раза.
Мокирование с использованием AsyncRead и AsyncWrite
Общие трейты для асинхронного чтения и записи (AsyncRead и AsyncWrite) реализованы, например, сокетами. Они могут быть использованы для мокирования I/O, выполняемого сокетом.
Рассмотрим для настройки этот простой цикл TCP-сервера:
use tokio::net::TcpListener; #[tokio::main] async fn main() { if true { return } let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); loop { let Ok((mut socket, _)) = listener.accept().await else { eprintln!("Failed to accept client"); continue; }; tokio::spawn(async move { let (reader, writer) = socket.split(); // Запускаем некоторый обработчик клиентского соединения, например: // handle_connection(reader, writer) // .await // .expect("Failed to handle connection"); }); } }
Здесь каждое TCP-клиентское соединение обслуживается своей выделенной задачей tokio. Эта задача владеет reader и writer, которые разделены из TcpStream.
Теперь рассмотрим саму задачу обработки клиента, особенно where-клаузу сигнатуры функции:
#![allow(unused)] fn main() { use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; async fn handle_connection<Reader, Writer>( reader: Reader, mut writer: Writer, ) -> std::io::Result<()> where Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin, { let mut line = String::new(); let mut reader = BufReader::new(reader); loop { if let Ok(bytes_read) = reader.read_line(&mut line).await { if bytes_read == 0 { break Ok(()); } writer .write_all(format!("Thanks for your message.\r\n").as_bytes()) .await .unwrap(); } line.clear(); } } }
По сути, данные reader и writer, которые реализуют AsyncRead и AsyncWrite, обслуживаются последовательно. Для каждой полученной строки обработчик отвечает "Thanks for your message.".
Для модульного тестирования обработчика клиентского соединения можно использовать tokio_test::io::Builder как мок:
#![allow(unused)] fn main() { use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; async fn handle_connection<Reader, Writer>( reader: Reader, mut writer: Writer, ) -> std::io::Result<()> where Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin, { let mut line = String::new(); let mut reader = BufReader::new(reader); loop { if let Ok(bytes_read) = reader.read_line(&mut line).await { if bytes_read == 0 { break Ok(()); } writer .write_all(format!("Thanks for your message.\r\n").as_bytes()) .await .unwrap(); } line.clear(); } } #[tokio::test] async fn client_handler_replies_politely() { let reader = tokio_test::io::Builder::new() .read(b"Hi there\r\n") .read(b"How are you doing?\r\n") .build(); let writer = tokio_test::io::Builder::new() .write(b"Thanks for your message.\r\n") .write(b"Thanks for your message.\r\n") .build(); let _ = handle_connection(reader, writer).await; } }
hyper
hyper — это HTTP библиотека для языка Rust.
Если вы здесь впервые, ознакомьтесь с разделом как настроить.
- Если вы создаете веб-сервер, перейдите к руководству по серверу.
- Если вы пытаетесь взаимодействовать с сервером, перейдите к руководству по клиенту.
Вы также можете посмотреть [сгенерированную документацию API][docs].
Setup
Это поможет подготовить вашу первоначальную настройку для возможности опробовать уроки из руководств.
Зависимости
Чтобы упростить задачу, вы можете добавить следующие крейты в ваш Cargo.toml:
[dependencies]
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
И с этим вы готовы к работе! В зависимости от того, что вы хотите достичь, вы можете перейти к руководствам по клиенту или серверу.
Runtime
С версией hyper v1.0, которая удалила tokio как зависимость среды выполнения, был введен новый трейт среды выполнения hyper::rt. Если вы все еще хотите использовать tokio, реализация tokio для hyper::rt предоставляется крейтом hyper-util.
Создание собственных реализаций hyper::rt с помощью Tokio
Давайте создадим простые реализации hyper::rt с помощью tokio. Сначала убедитесь, что у вас есть tokio как зависимость в вашем Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
Теперь давайте попробуем написать простой исполнитель hyper::rt::Executor, мы назовем его TokioExecutor:
#![allow(unused)] fn main() { extern crate hyper; /// Исполнитель future, который использует потоки `tokio`. #[non_exhaustive] #[derive(Default, Debug, Clone)] pub struct TokioExecutor {} }
Трейт hyper::rt::Executor ожидает метод execute, который просит среду выполнения выполнить future. tokio позволяет это легко сделать с помощью tokio::spawn:
#![allow(unused)] fn main() { extern crate hyper; extern crate tokio; use std::future::Future; use hyper::rt::Executor; #[non_exhaustive] #[derive(Default, Debug, Clone)] pub struct TokioExecutor {} impl<Fut> Executor<Fut> for TokioExecutor where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { tokio::spawn(fut); } } }
Теперь у нас есть рабочий hyper::rt::Executor с Tokio, и он готов к использованию везде, где требуется Executor. Например, с автоматическим соединением из hyper-util:
#![allow(unused)] fn main() { extern crate hyper; extern crate hyper_util; extern crate tokio; use std::future::Future; use hyper_util::server::conn::auto; use hyper::rt::Executor; #[non_exhaustive] #[derive(Default, Debug, Clone)] pub struct TokioExecutor {} impl<Fut> Executor<Fut> for TokioExecutor where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { tokio::spawn(fut); } } impl TokioExecutor { pub fn new() -> Self { Self {} } } auto::Builder::new(TokioExecutor::new()); }
Использование реализаций hyper::rt с tokio в hyper-util
Крейт hyper-util предоставляет реализации трейтов hyper::rt с Tokio. Для использования нам понадобится иметь hyper-util как зависимость.
[dependencies]
hyper-util = { version = "0.1", features = ["full"] }
Затем вам просто нужно импортировать и использовать это, как в примере выше:
#![allow(unused)] fn main() { extern crate hyper; extern crate hyper_util; use hyper::rt::Executor; use hyper_util::rt::TokioExecutor; use hyper_util::server::conn::auto; auto::Builder::new(TokioExecutor::new()); }
В крейте hyper_util есть больше реализаций. Ознакомьтесь с документацией по hyper_util::rt для получения более подробной информации.
Server
Сервер HYPER
Начало работы с сервером
Давайте начнем с создания сервера "Hello, World!" и расширим его функциональность.
Первым делом нам нужно объявить наши зависимости. Добавим следующее в наш Cargo.toml:
[dependencies]
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
Затем нам нужно добавить несколько импортов в наш файл main.rs:
extern crate tokio; extern crate hyper; extern crate http_body_util; extern crate hyper_util; use std::convert::Infallible; use std::net::SocketAddr; use http_body_util::Full; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Request, Response}; use hyper_util::rt::TokioIo; use tokio::net::TcpListener; fn main() {}
Создание Сервиса (Service)
[Service][service] позволяет нам определить, как наш сервер будет отвечать на входящие запросы. Он представляет собой асинхронную функцию, которая принимает [Request][request] и возвращает Future. Когда обработка этого future завершается, он разрешается в [Response][response] или ошибку.
Hyper предоставляет утилиту для создания Service из функции, которая должна покрыть большинство случаев использования: [service_fn][service_fn]. Мы используем её, чтобы создать сервис из нашей функции hello, когда будем готовы запустить сервер.
extern crate hyper; extern crate http_body_util; use std::convert::Infallible; use http_body_util::Full; use hyper::body::Bytes; use hyper::{Request, Response}; fn main() {} async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello, World!")))) }
Используя эту функцию в качестве сервиса, мы указываем нашему серверу отвечать на все запросы статусом 200 OK по умолчанию. Тело ответа Body будет содержать наше дружеское приветствие в виде одного фрагмента байтов (chunk of bytes), а заголовок Content-Length будет установлен автоматически.
Запуск сервера
Наконец, нам нужно подключить наш сервис hello к работающему серверу Hyper.
Мы углубимся в детали некоторых из этих вещей в другом руководстве.
extern crate tokio; extern crate hyper; extern crate http_body_util; extern crate hyper_util; mod no_run { use std::convert::Infallible; use std::net::SocketAddr; use http_body_util::Full; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Request, Response}; use hyper_util::rt::TokioIo; use tokio::net::TcpListener; async fn hello( _: Request<hyper::body::Incoming>, ) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello World!")))) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); // Мы создаем TcpListener и привязываем его к 127.0.0.1:3000 let listener = TcpListener::bind(addr).await?; // Мы запускаем цикл для непрерывного принятия входящих соединений loop { let (stream, _) = listener.accept().await?; // Используем адаптер для доступа к чему-то, реализующему трейты `tokio::io`, как если бы они реализовывали // трейты ввода-вывода `hyper::rt`. let io = TokioIo::new(stream); // Порождаем задачу tokio для конкурентного обслуживания множественных соединений tokio::task::spawn(async move { // Наконец, мы привязываем входящее соединение к нашему сервису `hello` if let Err(err) = http1::Builder::new() // `service_fn` преобразует нашу функцию в `Service` .serve_connection(io, service_fn(hello)) .await { eprintln!("Ошибка при обслуживании соединения: {:?}", err); } }); } } } fn main() {}
Чтобы увидеть все фрагменты кода, собранные вместе, ознакомьтесь с [полным примером][example]!
Кроме того, если service_fn не удовлетворяет вашим требованиям и вы хотите реализовать Service самостоятельно, см. этот [пример][impl service].
HTTP/2
Этот пример использует модуль http1 для создания сервера, который работает по протоколу HTTP/1.
Если вы хотите использовать HTTP/2, вы можете использовать модуль http2 с небольшими изменениями по сравнению с сервером http1. Сборщик (builder) для http2 требует для работы исполнитель (executor). Исполнитель должен реализовывать трейт hyper::rt::Executor.
Чтобы реализовать Executor, ознакомьтесь с примером [runtime][runtime]. Чтобы увидеть полный пример с HTTP/2, ознакомьтесь с [полным примером][example_http2]!
[service]: {{ site.hyper_docs_url }}/hyper/service/trait.Service.html [service_fn]: {{ site.hyper_docs_url }}/hyper/service/fn.service_fn.html [request]: {{ site.hyper_docs_url }}/hyper/struct.Request.html [response]: {{ site.hyper_docs_url }}/hyper/struct.Response.html [parts]: {{ site.http_docs_url }}/http/response/struct.Parts.html [example]: {{ site.examples_url }}/hello.rs [example_http2]: {{ site.examples_url }}/hello-http2.rs [runtime]: ../init/runtime.md [impl service]: {{ site.examples_url }}/service_struct_impl.rs
Echo, echo ...
У вас уже есть сервер Hello World? Отлично! Обычно серверы делают больше, чем просто выдают одно и то же тело для каждого запроса. Чтобы продемонстрировать несколько других возможностей hyper, это руководство пройдет через создание эхо-сервера.
Эхо-сервер будет прослушивать входящие соединения и возвращать тело запроса как тело ответа для POST запросов.
Маршрутизация
Первое, что мы сделаем, помимо переименования нашего сервиса в echo, это настройка маршрутизации. Мы хотим иметь маршрут с инструкциями по использованию нашего сервера и другой для приема данных. О, и мы также должны обработать случай, когда кто-то запрашивает неизвестный маршрут!
Прежде чем начать, нам нужно добавить несколько новых импортов:
extern crate hyper; extern crate http_body_util; use hyper::body::Frame; use hyper::{Method, StatusCode}; use http_body_util::{combinators::BoxBody, BodyExt}; fn main() {}
Далее нам нужно внести некоторые изменения в нашу функцию Service, но как вы можете видеть, это все еще просто асинхронная функция, которая принимает Request и возвращает future Response, и вы можете передать ее вашему серверу так же, как мы делали для сервиса hello.
В отличие от нашего сервиса hello, где мы не заботились о теле запроса и всегда возвращали один фрагмент байтов с нашим приветствием, теперь мы хотим немного больше свободы в формировании тела нашего Response. Для этого мы изменим тип Body в нашем Response на упакованный объект трейта. Нас интересует только то, что тело ответа реализует трейт Body, что его данные - это Bytes, а его ошибка - hyper::Error.
extern crate hyper; extern crate http_body_util; use hyper::body::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::{Method, Request, Response, StatusCode}; async fn echo( req: Request<hyper::body::Incoming>, ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { match (req.method(), req.uri().path()) { (&Method::GET, "/") => Ok(Response::new(full( "Try POSTing data to /echo", ))), (&Method::POST, "/echo") => { // мы вернемся к этому Ok(Response::new(req.into_body().boxed())) }, // Возвращаем 404 Not Found для других маршрутов. _ => { let mut not_found = Response::new(empty()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } } } // Мы создаем некоторые вспомогательные функции, чтобы Empty и Full тела // соответствовали нашему расширенному типу тела Response. fn empty() -> BoxBody<Bytes, hyper::Error> { Empty::<Bytes>::new() .map_err(|never| match never {}) .boxed() } fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { Full::new(chunk.into()) .map_err(|never| match never {}) .boxed() } fn main() {}
Мы построили супер простую таблицу маршрутизации, просто сопоставляя method и path входящего Request. Если кто-то запрашивает GET /, наш сервис сообщит им, что они должны попробовать наши эхо-возможности. Мы также проверяем POST /echo, но в настоящее время ничего с этим не делаем.
Наше третье правило перехватывает любую другую комбинацию метода и пути и изменяет StatusCode Response. Статус по умолчанию для Response - это HTTP 200 OK (StatusCode::OK), что правильно для других маршрутов. Но третий случай вместо этого отправит обратно 404 Not Found.
Потоки тела
Теперь давайте реализуем это эхо. HTTP тело - это поток Frame, каждый Frame содержит части данных Body или трейлеры. Поэтому вместо того, чтобы читать все Body в буфер перед отправкой нашего ответа, мы можем передавать каждый кадр по мере его поступления. Мы начнем с самого простого решения, а затем внесем изменения, демонстрируя более сложные вещи, которые вы можете делать с потоками Body.
Первое - простое эхо. И Request, и Response имеют потоки тела, и по умолчанию вы можете легко передать Body Request в Response.
extern crate hyper; extern crate http_body_util; use hyper::body::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::{Method, Request, Response, StatusCode}; async fn echo( req: Request<hyper::body::Incoming>, ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { match (req.method(), req.uri().path()) { // Внутри match из предыдущего кода (&Method::POST, "/echo") => Ok(Response::new(req.into_body().boxed())), _ => unreachable!(), } } fn main() {}
Запуск нашего сервера теперь будет возвращать эхо любых данных, которые мы отправляем через POST на /echo. Это было легко. Что если мы хотим преобразовать весь текст в верхний регистр? Мы могли бы использовать map на наших потоках.
Отображение тела
Каждый data Frame нашего потока тела - это фрагмент байтов, который мы можем удобно представить с помощью типа Bytes из hyper. Его можно легко преобразовать в другие типичные контейнеры байтов.
Далее давайте добавим новый маршрут /echo/uppercase, отображающий каждый байт в data Frame нашего тела запроса в верхний регистр и возвращающий поток в нашем Response:
extern crate hyper; extern crate http_body_util; use hyper::body::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::body::Frame; use hyper::{Method, Request, Response, StatusCode}; async fn echo( req: Request<hyper::body::Incoming>, ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { match (req.method(), req.uri().path()) { // Еще один маршрут внутри нашего блока match... (&Method::POST, "/echo/uppercase") => { // Отображаем frame этого тела в другой тип let frame_stream = req.into_body().map_frame(|frame| { let frame = if let Ok(data) = frame.into_data() { // Преобразуем каждый байт в каждом Data frame в верхний регистр data.iter() .map(|byte| byte.to_ascii_uppercase()) .collect::<Bytes>() } else { Bytes::new() }; Frame::data(frame) }); Ok(Response::new(frame_stream.boxed())) }, _ => unreachable!(), } } fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { Full::new(chunk.into()) .map_err(|never| match never {}) .boxed() } fn main() {}
И вот так у нас есть два эхо-маршрута: /echo, который не выполняет преобразований, и /echo/uppercase, который возвращает все байты после преобразования их в ASCII верхний регистр.
Буферизация тела запроса
Что если мы хотим, чтобы наш эхо-сервис переворачивал полученные данные и отправлял их обратно нам? Мы не можем действительно передавать данные по мере их поступления, поскольку нам нужно найти конец, прежде чем мы сможем ответить. Чтобы сделать это, мы можем изучить, как легко собрать полное тело.
Мы хотим собрать все тело запроса и отобразить результат в нашу функцию reverse, затем вернуть окончательный результат. Если мы импортируем трейт-расширение http_body_util::BodyExt, мы можем вызвать метод collect на нашем теле, который доведет поток до завершения, собирая все data и trailer frames в тип Collected. Мы можем легко превратить Collected тело в единый Bytes, вызвав его метод into_bytes.
Примечание: Вы всегда должны быть осторожны, чтобы не буферизовать без максимального ограничения. Мы установим здесь максимум 64 КБ.
extern crate hyper; extern crate http_body_util; use hyper::body::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::{body::Body, Method, Request, Response, StatusCode}; async fn echo( req: Request<hyper::body::Incoming>, ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { match (req.method(), req.uri().path()) { // Еще один маршрут внутри нашего блока match... (&Method::POST, "/echo/reversed") => { // Защищаем наш сервер от огромных тел. let upper = req.body().size_hint().upper().unwrap_or(u64::MAX); if upper > 1024 * 64 { let mut resp = Response::new(full("Body too big")); *resp.status_mut() = hyper::StatusCode::PAYLOAD_TOO_LARGE; return Ok(resp); } // Ожидаем, пока все тело будет собрано в единый `Bytes`... let whole_body = req.collect().await?.to_bytes(); // Итерируем все тело в обратном порядке и собираем в новый Vec. let reversed_body = whole_body.iter() .rev() .cloned() .collect::<Vec<u8>>(); Ok(Response::new(full(reversed_body))) }, _ => unreachable!(), } } fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { Full::new(chunk.into()) .map_err(|never| match never {}) .boxed() } fn main() {}
Вы можете увидеть компилируемый [пример здесь][example].
[example]: {{ site.examples_url }}/echo.rs
Начало работы с Middleware (Промежуточным слоем) сервера
Как упоминалось в разделе [Обновление][upgrading], hyper v1 не зависит от tower в отношении трейта Service. Когда мы хотим добавить middleware, подобный tower, существует 2 подхода для его реализации.
Давайте создадим Logger middleware для [сервера hello-world][hello-world] в качестве примера:
Сначала добавим зависимость tower
[dependencies]
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
tower = "0.4" # и это
Вариант 1: Использование трейта Service из hyper
Реализуем Logger middleware для hyper
extern crate hyper; use hyper::{Request, body::Incoming, service::Service}; #[derive(Debug, Clone)] pub struct Logger<S> { inner: S, } impl<S> Logger<S> { pub fn new(inner: S) -> Self { Logger { inner } } } type Req = Request<Incoming>; impl<S> Service<Req> for Logger<S> where S: Service<Req>, { type Response = S::Response; type Error = S::Error; type Future = S::Future; fn call(&self, req: Req) -> Self::Future { println!("processing request: {} {}", req.method(), req.uri().path()); self.inner.call(req) } } fn main() {}
Затем это можно использовать в сервере:
extern crate tower; extern crate hyper; extern crate http_body_util; extern crate tokio; extern crate hyper_util; mod no_run { use std::{convert::Infallible, net::SocketAddr}; use hyper::{ service::Service, body::{Bytes, Incoming}, server::conn::http1, Request, Response, }; use http_body_util::Full; use hyper_util::rt::TokioIo; use tokio::net::TcpListener; use tower::ServiceBuilder; #[derive(Debug, Clone)] pub struct Logger<S> { inner: S, } impl<S> Logger<S> { pub fn new(inner: S) -> Self { Logger { inner } } } type Req = Request<Incoming>; impl<S> Service<Req> for Logger<S> where S: Service<Req>, { type Response = S::Response; type Error = S::Error; type Future = S::Future; fn call(&self, req: Req) -> Self::Future { println!("processing request: {} {}", req.method(), req.uri().path()); self.inner.call(req) } } async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello, World!")))) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await?; loop { let (stream, _) = listener.accept().await?; let io = TokioIo::new(stream); tokio::spawn(async move { // Важно: здесь следует использовать hyper service_fn, так как требуется реализация трейта Service из hyper! let svc = hyper::service::service_fn(hello); let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc); if let Err(err) = http1::Builder::new().serve_connection(io, svc).await { eprintln!("server error: {}", err); } }); } } } fn main() {}
Вариант 2: Использование трейта TowerToHyperService из hyper
Трейт [hyper_util::service::TowerToHyperService][adapter-trait] — это адаптер для преобразования сервиса tower в сервис hyper.
Теперь реализуем Logger middleware для tower
extern crate tower; extern crate hyper; use hyper::{Request, body::Incoming}; use tower::Service; #[derive(Debug, Clone)] pub struct Logger<S> { inner: S, } impl<S> Logger<S> { pub fn new(inner: S) -> Self { Logger { inner } } } type Req = Request<Incoming>; impl<S> Service<Req> for Logger<S> where S: Service<Req> + Clone, { type Response = S::Response; type Error = S::Error; type Future = S::Future; fn poll_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx) } fn call(&mut self, req: Req) -> Self::Future { println!("processing request: {} {}", req.method(), req.uri().path()); self.inner.call(req) } } fn main() {}
Затем используем его в сервере:
extern crate hyper; extern crate http_body_util; extern crate hyper_util; extern crate tokio; extern crate tower; mod no_run { use std::{convert::Infallible, net::SocketAddr}; use hyper::{ body::{Bytes, Incoming}, server::conn::http1, Request, Response, }; use http_body_util::Full; use hyper_util::{rt::TokioIo, service::TowerToHyperService}; use tokio::net::TcpListener; use tower::{ServiceBuilder, Service}; #[derive(Debug, Clone)] pub struct Logger<S> { inner: S, } impl<S> Logger<S> { pub fn new(inner: S) -> Self { Logger { inner } } } type Req = Request<Incoming>; impl<S> Service<Req> for Logger<S> where S: Service<Req> + Clone, { type Response = S::Response; type Error = S::Error; type Future = S::Future; fn poll_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx) } fn call(&mut self, req: Req) -> Self::Future { println!("processing request: {} {}", req.method(), req.uri().path()); self.inner.call(req) } } async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello, World!")))) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await?; loop { let (stream, _) = listener.accept().await?; let io = TokioIo::new(stream); tokio::spawn(async move { // Важно: здесь следует использовать tower service_fn, так как требуется сначала реализовать трейт Service из tower, а затем преобразовать в сервис hyper! let svc = tower::service_fn(hello); let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc); // Преобразуем его в сервис hyper let svc = TowerToHyperService::new(svc); if let Err(err) = http1::Builder::new().serve_connection(io, svc).await { eprintln!("server error: {}", err); } }); } } fn main() {}
[hellp-world]: {{ site.url }}/guides/1/server/hello-world/ [upgrading]: {{ site.url }}/guides/1/upgrading/ [adapter-trait]: {{ site.hyper_util_url }}/latest/hyper_util/service/struct.TowerToHyperService.html
Грациозное завершение работы сервера
Серверные соединения hyper имеют возможность инициировать грациозное завершение работы. Часто возникает необходимость координировать грациозное завершение работы всех активных соединений. Эту задачу мы и рассмотрим в данном руководстве.
Грациозное завершение работы — это когда соединение перестает принимать новые запросы, позволяя при этом завершиться текущим выполняющимся запросам.
Для этого нам понадобится несколько компонентов:
- Сигнал для начала завершения работы.
- Цикл принятия (accept loop) для обработки вновь поступающих соединений.
- Наблюдатель (watcher) для координации завершения работы.
Определение сигнала завершения
Вы можете использовать любой механизм для сигнализации о начале грациозного завершения работы. Это может быть обработчик сигналов процесса, таймер, специальный HTTP-запрос или что-либо еще.
В этом руководстве мы будем использовать обработчик сигнала CTRL+C. В Tokio есть простая поддержка для его создания:
extern crate tokio; async fn shutdown_signal() { // Ждем сигнала CTRL+C tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); } fn main() {}
Модификация цикла принятия соединений сервера
Нестабильно: Код, обсуждаемый в этом руководстве, находится в
hyper-util, который не так стабилен, как код вhyper. Он готов к использованию в production, но изменения могут происходить чаще.
Мы предполагаем, что у вас есть цикл принятия соединений для вашего сервера, аналогичный показанному в руководстве Hello World. Поэтому мы просто модифицируем его здесь:
extern crate hyper; extern crate http_body_util; extern crate hyper_util; extern crate tokio; mod no_run { use std::convert::Infallible; use std::net::SocketAddr; use http_body_util::Full; use hyper::body::Bytes; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Request, Response}; use hyper_util::rt::TokioIo; use tokio::net::TcpListener; async fn shutdown_signal() {} async fn hello( _: Request<hyper::body::Incoming>, ) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello World!")))) } async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await?; // Указываем настройки HTTP (http1, http2, auto — все работает) let mut http = http1::Builder::new(); // Наблюдатель для грациозного завершения let graceful = hyper_util::server::graceful::GracefulShutdown::new(); // Когда этот сигнал завершится, начинаем завершение работы let mut signal = std::pin::pin!(shutdown_signal()); // Наш цикл принятия соединений сервера loop { tokio::select! { Ok((stream, _addr)) = listener.accept() => { let io = TokioIo::new(stream); let conn = http.serve_connection(io, service_fn(hello)); // Наблюдаем за этим соединением let fut = graceful.watch(conn); tokio::spawn(async move { if let Err(e) = fut.await { eprintln!("Error serving connection: {:?}", e); } }); }, _ = &mut signal => { drop(listener); eprintln!("graceful shutdown signal received"); // Останавливаем цикл принятия break; } } } // Теперь начинаем завершение работы и ждем, пока все соединения завершатся // Опционально: запускаем таймаут, чтобы ограничить время ожидания. tokio::select! { _ = graceful.shutdown() => { eprintln!("all connections gracefully closed"); }, _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { eprintln!("timed out wait for all connections to close"); } } Ok(()) } } fn main() {}
Client
О клиенте
Начало работы
Для начала мы просто получим работающий простой GET запрос к веб-странице, чтобы увидеть все взаимодействующие части. Сначала нам нужны зависимости. Сообщим Cargo о наших зависимостях, добавив это в Cargo.toml.
Зависимости
[dependencies]
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
Теперь нам нужно импортировать компоненты из наших зависимостей:
extern crate http_body_util; extern crate hyper; extern crate tokio; extern crate hyper_util; use http_body_util::Empty; use hyper::Request; use hyper::body::Bytes; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; fn main() {}
Runtime (среда выполнения)
Теперь мы сделаем запрос в main нашей программы. Это может показаться несколько сложным для простого запроса, и вы будете правы, но цель здесь — просто показать всю необходимую настройку. Как только у вас это есть, вы готовы эффективно выполнять тысячи клиентских запросов.
Нам нужно настроить какую-либо среду выполнения (runtime). Вы можете использовать любой асинхронный runtime, но в этом руководстве мы будем использовать tokio. Если вы никогда раньше не использовали futures в Rust, вам может быть полезно прочитать руководство Tokio по Futures.
extern crate tokio; mod no_run { #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // Здесь мы настроим наши HTTP клиентские запросы. Ok(()) } } fn main() {}
Настройка
Для начала нам нужно настроить несколько вещей. В этом руководстве мы собираемся отправить GET [Request][Request] на http://httpbin.org/ip, который вернет 200 OK и IP-адрес отправителя в теле ответа.
Нам нужно открыть TCP-соединение с удаленным хостом, используя имя хоста и порт, в данном случае это httpbin.org и порт по умолчанию для HTTP: 80. После открытия соединения мы передаем его в функцию client::conn::http1::handshake, выполняя рукопожатие для подтверждения готовности удаленной стороны принимать наши запросы.
Успешное рукопожатие даст нам future [Connection][Connection], который обрабатывает все состояние HTTP, и структуру [SendRequest][SendRequest], которую мы можем использовать для отправки наших Request по соединению.
Чтобы начать управлять состоянием HTTP, мы должны опрашивать (poll) Connection, поэтому для завершения настройки мы породим задачу tokio::task и будем awaitить её.
extern crate http_body_util; extern crate hyper; extern crate hyper_util; extern crate tokio; use http_body_util::Empty; use hyper::body::Bytes; use hyper::Request; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { // Парсим наш URL... let url = "http://httpbin.org/ip".parse::<hyper::Uri>()?; // Получаем хост и порт let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let address = format!("{}:{}", host, port); // Открываем TCP-соединение с удаленным хостом let stream = TcpStream::connect(address).await?; // Используем адаптер для доступа к чему-то, реализующему трейты `tokio::io`, как если бы они реализовывали // трейты ввода-вывода `hyper::rt`. let io = TokioIo::new(stream); // Создаем клиент Hyper let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; // Порождаем задачу для опроса соединения, управляя состоянием HTTP tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); let authority = url.authority().unwrap().clone(); let req = Request::builder() .uri(url) .header(hyper::header::HOST, authority.as_str()) .body(Empty::<Bytes>::new())?; let mut res = sender.send_request(req).await?; Ok(()) } fn main() {}
GET
Теперь, когда мы настроили наше соединение, мы готовы сконструировать и отправить наш первый Request! Поскольку SendRequest не требует URI в абсолютной форме, мы обязаны включать заголовок HOST в наши запросы. И хотя мы можем отправить наш Request с пустым Body, нам нужно явно его установить, что мы сделаем с помощью утилитарной структуры [Empty][Empty].
Все, что нам нужно сделать теперь, это передать Request в SendRequest::send_request, это возвращает future, который разрешится в [Response][Response] от httpbin.org. Мы выведем статус ответа, чтобы убедиться, что он вернул ожидаемый статус 200 OK.
extern crate http_body_util; extern crate hyper; extern crate hyper_util; extern crate tokio; use http_body_util::Empty; use hyper::body::Bytes; use hyper::Request; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let url = "http://httpbin.org/ip".parse::<hyper::Uri>()?; let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; let io = TokioIo::new(stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); // Authority нашего URL будет именем хоста удаленного httpbin let authority = url.authority().unwrap().clone(); // Создаем HTTP запрос с пустым телом и заголовком HOST let req = Request::builder() .uri(url) .header(hyper::header::HOST, authority.as_str()) .body(Empty::<Bytes>::new())?; // Ожидаем ответ... let mut res = sender.send_request(req).await?; println!("Response status: {}", res.status()); Ok(()) } fn main() {}
Тела ответов
Мы знаем, что отправка GET Request на httpbin.org/ip вернет наш IP-адрес в теле Response. Чтобы увидеть возвращенное тело, мы просто запишем его в stdout.
Тела в hyper являются асинхронными потоками [Frame][Frame], поэтому нам не нужно ждать, пока все тело придет, буферизуя его в памяти, а затем записывать. Мы можем просто awaitить каждый Frame и записывать их непосредственно в stdout по мере поступления!
В дополнение к импорту stdout, нам нужно использовать трейт BodyExt:
#![allow(unused)] fn main() { extern crate http_body_util; extern crate tokio; use http_body_util::BodyExt; use tokio::io::{AsyncWriteExt as _, self}; }
extern crate http_body_util; extern crate hyper; extern crate hyper_util; extern crate tokio; use http_body_util::{BodyExt, Empty}; use hyper::body::Bytes; use hyper::Request; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; use tokio::io::{self, AsyncWriteExt as _}; async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let url = "http://httpbin.org/ip".parse::<hyper::Uri>()?; let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; let io = TokioIo::new(stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); let authority = url.authority().unwrap().clone(); let req = Request::builder() .uri(url) .header(hyper::header::HOST, authority.as_str()) .body(Empty::<Bytes>::new())?; let mut res = sender.send_request(req).await?; // Потоково передаем тело, записывая каждый фрейм в stdout по мере его поступления while let Some(next) = res.frame().await { let frame = next?; if let Some(chunk) = frame.data_ref() { io::stdout().write_all(chunk).await?; } } Ok(()) } fn main() {}
И это все! Вы можете увидеть [полный пример здесь][example].
[StatusCode]: {{ site.hyper_docs_url }}/hyper/struct.StatusCode.html [Response]: {{ site.http_docs_url }}/http/response/struct.Response.html [Request]: {{ site.http_docs_url }}/http/request/struct.Request.html [Connection]: {{ site.hyper_docs_url }}/hyper/client/conn/http1/struct.Connection.html [SendRequest]: {{ site.hyper_docs_url }}/hyper/client/conn/http1/struct.SendRequest.html [Frame]: {{ site.hyper_docs_url }}/hyper/body/struct.Frame.html [Empty]: {{ site.http_body_util_url }}/http_body_util/struct.Empty.html
[example]: {{ site.examples_url }}/client.rs
Коннекторы, Пулы соединений и HTTPS
Нестабильно: Код, обсуждаемый в этом руководстве, находится в
hyper-util, который не так стабилен, как код вhyper. Он готов к использованию в production, но изменения могут происходить чаще.
В РАЗРАБОТКЕ
Что такое коннектор?
В РАЗРАБОТКЕ
Пулы соединений
В РАЗРАБОТКЕ
HTTPS
hyper позволяет вам использовать вашу собственную реализацию IO, поэтому он может работать поверх любой реализации TLS. (TODO: ссылка на руководство по runtime)
Также существуют крейты, которые предоставляют "коннекторы", что приводит к простому в использовании HTTPS для legacy клиента в hyper-util. У каждого из них есть свои причины для существования, плюсы и минусы, но этот список предоставлен, чтобы помочь вам начать работу1:
-
Это не является одобрением какого-либо из крейтов, и все они поддерживаются отдельно от hyper. ↩
Руководства по POST запросам
POST
Теперь, когда мы увидели, как сделать GET запрос, давайте посмотрим, как сделать POST запрос. Это полезно, когда вам нужно отправить данные на сервер, например, при отправке формы или загрузке файла.
Чтобы сделать POST запрос, нам нужно изменить несколько вещей по сравнению с GET запросом:
- Мы установим метод в POST.
- Нам нужно предоставить тело запроса.
- Нам нужно указать тип данных в нашем теле, добавив заголовок
hyper::header::CONTENT_TYPE.
Для тела у нас есть несколько вариантов. Мы можем использовать простую строку, строку JSON или сырые байты. Давайте рассмотрим все три:
extern crate http_body_util; extern crate hyper; extern crate hyper_util; extern crate tokio; use http_body_util::Full; use hyper::body::Bytes; use hyper::Request; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { let url = "http://httpbin.org/ip".parse::<hyper::Uri>()?; let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; let io = TokioIo::new(stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); } }); // Authority нашего URL будет именем хоста удаленного httpbin let authority = url.authority().unwrap().clone(); // Для простого текста let req_body = Full::<Bytes>::from("Some plain text as a body."); let req = Request::builder() .method(hyper::Method::POST) .header(hyper::header::HOST, authority.as_str()) .header(hyper::header::CONTENT_TYPE, "text/plain") .body(req_body)?; // Для JSON данных let json_data = r#"{"key": "value"}"#; let req_body = Full::<Bytes>::from(json_data); let req = Request::builder() .method(hyper::Method::POST) .header(hyper::header::HOST, authority.as_str()) .header(hyper::header::CONTENT_TYPE, "application/json") .body(req_body)?; // Для бинарных данных let binary_data = vec![0u8; 128]; // Пример бинарных данных let req_body = Full::<Bytes>::from(binary_data); let req = Request::builder() .method(hyper::Method::POST) .header(hyper::header::HOST, authority.as_str()) .header(hyper::header::CONTENT_TYPE, "application/octet-stream") .body(req_body)?; let res = sender.send_request(req).await?; println!("Response status: {}", res.status()); Ok(()) } fn main() {}
Обновление с v0.14 до v1
Это руководство предназначено для помощи в обновлении с версии 0.14 hyper до версии 1.
Возможно, вы искали руководства для v0.14?
Подготовка с помощью бэкпортов и устаревших функций
Перед обновлением вы можете начать подготовку вашей кодовой базы 0.14, включив функции backports и deprecated в hyper в вашем Cargo.toml. Вот так:
[dependencies]
hyper = { version = "0.14", features = ["etc", "backports", "deprecated"] }
Функция backports добавляет несколько новых типов из версии 1.0 в 0.14. Если вы также включите функцию deprecated, это добавит предупреждения об устаревании к любым типам hyper, для которых доступны прямые бэкпорты.
ПРИМЕЧАНИЕ: Это не даст вам предупреждений об изменениях, для которых не удалось предоставить бэкпорты.
Прочтите список изменений (Changelog)
Как общее правило, мы старались отметить каждое возможное критическое изменение в списке изменений. Просмотрите раздел "критические изменения" релизов версии 1.0 (включая RC 1-4), где будут предложения по преодолению каждого из них.
Body
Тип Body изменился и стал трейтом (то, что раньше было HttpBody).
Body из версии 0.14 мог иметь несколько вариантов, а в v1 они были разделены на отдельные типы. Вам будет полезно проанализировать каждое место, где вы используете hyper::Body, чтобы решить, на какое решение перейти.
- В общем случае, если вам не нужен конкретный вариант, рассмотрите возможность сделать ваше использование обобщенным, принимая
impl Body(илиwhere B: Body). - Если вы хотите тип, который может быть любым вариантом, вы можете использовать
BoxBody. - В противном случае более конкретные варианты позволяют создать более явный API в вашем коде.
Client
Клиент высокого уровня с пулом соединений Client был удален из hyper 1.0. Похожий тип был добавлен в hyper-util, называемый client::legacy::Client. В основном это прямая замена.
Server
Сервер hyper::Server из v0.14 не имеет прямой замены, поскольку у него были проблемы.
Для типа сервера, который может обрабатывать как HTTP/1, так и HTTP/2 одновременно, используйте server::conn::auto::Builder из hyper-util.
Серверный акцептор прослушивания можно заменить простым циклом.
Service/service_fn
Ранее hyper зависел от tower для трейта Service. Поскольку tower еще не достиг версии 1.0, hyper не мог публично зависеть от него. Поэтому его трейт Service и вспомогательная функция service_fn определены в hyper::service. Подробнее см. в middleware.
Примеры
Примеры использования hyper
Эти примеры показывают, как выполнять распространенные задачи с помощью hyper. Вам также могут быть полезны Руководства.
Если вы склонируете этот репозиторий, вы можете запустить любой из примеров командой:
cargo run --example {example_name} --features="full"
Зависимости
Полный список зависимостей, используемых в этих примерах:
[dependencies]
hyper = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
pretty_env_logger = "0.5"
http-body-util = "0.1"
bytes = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
form_urlencoded = "1"
http = "1"
futures-util = { version = "0.3", default-features = false }
Начало работы
Клиенты
-
client- Простой CLI HTTP-клиент, который запрашивает URL, переданный в параметрах, и выводит содержимое и детали ответа в stdout, читая содержимое по частям (chunk-by-chunk). -
client_json- Простая программа, которая GET-запросом получает JSON, читает тело асинхронно, парсит его с помощью serde и выводит результат.
Серверы
-
hello- Простой сервер, который возвращает "Hello World!". -
echo- Эхо-сервер, который копирует содержимое POST-запроса в содержимое ответа.
Более сложные примеры
-
gateway- Серверный шлюз (обратный прокси), который проксирует запросы к сервисуhello, описанному выше. -
graceful_shutdown- Сервер, который имеет таймаут для входящих соединений и выполняет грациозное завершение соединений. -
http_proxy- Простой HTTP(S) прокси, который обрабатывает и апгрейдитCONNECTзапросы, а затем проксирует данные между клиентом и удаленным сервером. -
multi_server- Сервер, который прослушивает два разных порта, с разнымиServiceдля каждого порта. -
params- Веб-сервер, который принимает форму с именем и числом, проверяет наличие параметров и валидирует ввод. -
send_file- Сервер, который отправляет содержимое файлов обратно, используя tokio-util для асинхронного чтения файлов. -
service_struct_impl- Структура, которая вручную реализует трейтServiceи использует общий счетчик между запросами. -
single_threaded- Сервер, работающий только в 1 потоке, что позволяет использовать состояние приложения!Send(например, счетчикRc). -
state- Веб-сервер, демонстрирующий базовое разделение состояния между запросами. Счетчик является общим, увеличивается при каждом запросе, и каждый ответ содержит последнее значение счетчика. -
upgrades- Сервер и клиент, демонстрирующие как выполнять HTTP апгрейды (такие как WebSockets). -
web_api- Сервер, состоящий из сервиса, который возвращает содержимое входящего POST-запроса в ответе в верхнем регистре, и сервиса, который вызывает первый сервис и включает ответ первого сервиса в свой собственный ответ.
tonic
Tonic
Реализация gRPC на Rust — высокопроизводительный, открытый, универсальный фреймворк для RPC, ориентированный на мобильные устройства и HTTP/2.
Примечание: В ветке master tonic в настоящее время готовятся критические изменения (breaking changes). Для работы с последней выпущенной версией кода обратитесь к ветке 0.14.x.
tonic — это реализация gRPC поверх HTTP/2, ориентированная на высокую производительность, совместимость и гибкость. Эта библиотека была создана для поддержки async/await "из коробки" и служит в качестве основного строительного блока для производственных систем, написанных на Rust.
Примеры | Веб-сайт | Документация | Чат
Обзор
tonic состоит из трёх основных компонентов: общая реализация gRPC, высокопроизводительная реализация HTTP/2 и генерация кода на основе prost. Общая реализация может работать с любой реализацией HTTP/2 и любой кодировкой через набор обобщённых трейтов. Реализация HTTP/2 основана на hyper — быстром клиенте и сервере для HTTP/1.1 и HTTP/2, построенном на основе надёжного стека tokio. Генерация кода включает инструменты для создания клиентов и серверов из определений protobuf.
Возможности
- Двунаправленные потоки (Bi-directional streaming)
- Высокопроизводительное асинхронное I/O
- Совместимость
- Поддержка TLS на основе
rustls - Балансировка нагрузки (Load balancing)
- Пользовательские метаданные
- Аутентификация
- Проверка работоспособности (Health Checking)
Начало работы
- Руководство
helloworldсодержит базовый пример использованияtonic, идеально подходящий для начинающих! - Руководство
routeguideпредоставляет полный пример использованияtonicи всех его функций.
Примеры кода можно найти в директории examples, а для более сложных сценариев interop может быть хорошим ресурсом, так как там показаны примеры многих функций gRPC.
Версия Rust
Минимальная поддерживаемая версия Rust (MSRV) для tonic — 1.75.
Зависимости
tonic-build использует компилятор protoc Protocol Buffers compiler в некоторых API, которые компилируют файлы ресурсов Protocol Buffers, такие как tonic_build::compile_protos().
Получение помощи
Сначала проверьте, есть ли ответ на ваш вопрос в документации по API. Если ответа там нет, есть активное сообщество в канале Tonic в Discord. Мы будем рады попытаться ответить на ваш вопрос. Если это не помогло, попробуйте создать issue с вашим вопросом.
Структура проекта
tonic: Общая реализация клиента и сервера для gRPC и HTTP/2.tonic-build: Генерация кода для служб на основеprost.tonic-types: Типы утилит gRPC на основеprost, включая поддержку Well Known Types gRPC.tonic-health: Реализация стандартной службы проверки работоспособности gRPC. Также служит примером как унарных вызовов, так и потоковой передачи ответов.tonic-reflection: Реализация механизма рефлексии gRPC на основе tonic.examples: Примеры реализаций gRPC, демонстрирующие TLS, балансировку нагрузки и двунаправленные потоки.interop: Реализация тестов на совместимость (interop).
Участие в разработке
:balloon: Спасибо за вашу помощь в улучшении проекта! Мы очень рады вашим участию! У нас есть руководство по участию, которое поможет вам внести вклад в проект Tonic.
Лицензия
Этот проект лицензирован по лицензии MIT.
Вклад
Если вы явно не указали иное, любой ваш вклад, намеренно представленный для включения в Tonic, будет лицензирован как MIT без каких-либо дополнительных условий.
Примечание переводчика: Ссылки и названия пакетов оставлены без изменения, так как они являются частью технической терминологии. Переведены пояснительные тексты, описания функций и разделов.
Примеры
Набор примеров, демонстрирующих возможности, предоставляемые tonic.
Для сборки этих примеров необходимо установить компилятор Protocol Buffers protoc, а также файлы ресурсов Protocol Buffers.
Установка зависимостей
Ubuntu:
sudo apt update && sudo apt upgrade -y
sudo apt install -y protobuf-compiler libprotobuf-dev
Alpine Linux:
sudo apk add protoc protobuf-dev
macOS:
Предполагается, что Homebrew уже установлен. (Если нет, см. инструкции по установке Homebrew на официальном сайте.)
brew install protobuf
Helloworld (Здравствуй, мир)
Клиент
$ cargo run --bin helloworld-client
Сервер
$ cargo run --bin helloworld-server
RouteGuide (Навигатор по маршрутам)
Клиент
$ cargo run --bin routeguide-client
Сервер
$ cargo run --bin routeguide-server
Authentication (Аутентификация)
Клиент
$ cargo run --bin authentication-client
Сервер
$ cargo run --bin authentication-server
Load Balance (Балансировка нагрузки)
Клиент
$ cargo run --bin load-balance-client
Сервер
$ cargo run --bin load-balance-server
Dynamic Load Balance (Динамическая балансировка нагрузки)
Клиент
$ cargo run --bin dynamic-load-balance-client
Сервер
$ cargo run --bin dynamic-load-balance-server
TLS (rustls)
Клиент
$ cargo run --bin tls-client
Сервер
$ cargo run --bin tls-server
Health Checking (Проверка работоспособности)
Сервер
$ cargo run --bin health-server
Server Reflection (Рефлексия сервера)
Сервер
$ cargo run --bin reflection-server
Tower Middleware (Промежуточное ПО Tower)
Сервер
$ cargo run --bin tower-server
Autoreloading Server (Сервер с автоматической перезагрузкой)
Сервер
systemfd --no-pid -s http::[::1]:50051 -- cargo watch -x 'run --bin autoreload-server'
Примечания:
Если вы используете функцию codegen, то следующие зависимости обязательны:
Для примера с автоперезагрузкой требуются следующие глобально установленные крейты:
Richer Error (Расширенная обработка ошибок)
Оба клиента и оба сервера делают одно и то же, но используют два разных подхода. Запустите один из серверов в одном терминале, а затем запустите клиенты в другом.
Клиент, использующий структуру ErrorDetails
$ cargo run --bin richer-error-client
Клиент, использующий вектор типов сообщений об ошибках
$ cargo run --bin richer-error-client-vec
Сервер, использующий структуру ErrorDetails
$ cargo run --bin richer-error-server
Сервер, использующий вектор типов сообщений об ошибках
$ cargo run --bin richer-error-server-vec
Hello world!
Начало работы
Это руководство представляет собой введение в Tonic и предполагает, что у вас есть базовый опыт работы с Rust, а также понимание того, что такое Protocol Buffers. Если это не так, можете ознакомиться со страницами, ссылки на которые приведены в этом параграфе, и вернуться к этому руководству, когда будете готовы!
Предварительные требования
Для запуска примеров кода и прохождения руководства единственным предварительным требованием является сам Rust. rustup — удобный инструмент для его установки, если у вас его ещё нет.
Настройка проекта
Для этого руководства мы начнём с создания нового Rust-проекта с помощью Cargo:
$ cargo new helloworld-tonic
$ cd helloworld-tonic
tonic работает на Rust 1.39 и выше, так как требует поддержки функциональности async_await.
$ rustup update
Определение сервиса HelloWorld
Наш первый шаг — определить gRPC-сервис, а также типы запросов и ответов методов, используя Protocol Buffers. Мы будем хранить наши .proto-файлы в директории в корне проекта. Обратите внимание, что Tonic не особо важен путь, по которому лежат наши .proto-определения.
$ mkdir proto
$ touch proto/helloworld.proto
Затем вы определяете RPC-методы внутри определения вашего сервиса, указывая их типы запросов и ответов. gRPC позволяет определять четыре вида методов сервиса, и все они поддерживаются Tonic. В этом руководстве мы будем использовать только простой RPC. Если вы хотите увидеть пример Tonic, использующий все четыре вида, пожалуйста, прочитайте руководство по routeguide.
Сначала мы определяем имя нашего пакета, которое Tonic ищет при подключении ваших protobuf-файлов в клиентские и серверные приложения. Дадим ему имя helloworld.
syntax = "proto3";
package helloworld;
Далее нам нужно определить наш сервис. Этот сервис будет содержать фактические RPC-вызовы, которые мы будем использовать в нашем приложении. RPC содержит идентификатор, тип запроса и возвращает тип ответа. Вот наш сервис Greeter, который предоставляет RPC-метод SayHello.
service Greeter {
// Наш RPC SayHello принимает HelloRequests и возвращает HelloReplies
rpc SayHello (HelloRequest) returns (HelloReply);
}
Наконец, мы должны определить те типы, которые использовали выше в нашем RPC-методе SayHello. Типы RPC определяются как сообщения (messages), которые содержат типизированные поля. Вот как это будет выглядеть для нашего приложения HelloWorld:
message HelloRequest {
// Сообщение запроса содержит имя для приветствия
string name = 1;
}
message HelloReply {
// Ответ содержит приветственное сообщение
string message = 1;
}
Отлично! Теперь наш .proto-файл должен быть завершён и готов к использованию в нашем приложении. Вот как он должен выглядеть в готовом виде:
syntax = "proto3";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
Настройка приложения
Теперь, когда мы определили protobuf для нашего приложения, мы можем начать писать наше приложение на Tonic! Давайте сначала добавим необходимые зависимости в Cargo.toml.
[package]
name = "helloworld-tonic"
version = "0.1.0"
edition = "2021"
[[bin]] # Исполняемый файл для запуска gRPC-сервера HelloWorld
name = "helloworld-server"
path = "src/server.rs"
[[bin]] # Исполняемый файл для запуска gRPC-клиента HelloWorld
name = "helloworld-client"
path = "src/client.rs"
[dependencies]
tonic = "*"
prost = "0.14"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
[build-dependencies]
tonic-build = "*"
Мы включаем tonic-build как удобный способ интегрировать генерацию нашего клиентского и серверного gRPC-кода в процесс сборки нашего приложения. Теперь мы настроим этот процесс сборки:
Генерация кода Сервера и Клиента
В корне вашего проекта (не в /src) создайте файл build.rs и добавьте следующий код:
fn main() -> Result<(), Box<dyn std::error::Error>> { tonic_build::compile_protos("proto/helloworld.proto")?; Ok(()) }
Это указывает tonic-build скомпилировать ваши protobuf-файлы при сборке вашего Rust-проекта. Хотя вы можете настроить этот процесс сборки различными способами, мы не будем вдаваться в подробности в этом вводном руководстве. Пожалуйста, ознакомьтесь с документацией по tonic-build для получения сведений о конфигурации.
Написание нашего Сервера
Теперь, когда процесс сборки написан и все наши зависимости настроены, мы можем начать писать интересные вещи! Нам нужно импортировать всё, что мы будем использовать на нашем сервере, включая protobuf-файл. Начните с создания файла server.rs в вашей директории /src и напишите следующий код:
#![allow(unused)] fn main() { use tonic::{transport::Server, Request, Response, Status}; use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; pub mod hello_world { tonic::include_proto!("helloworld"); // Строка, указанная здесь, должна совпадать с именем proto-пакета } }
Далее давайте реализуем сервис Greeter, который мы ранее определили в нашем .proto-файле. Вот как это может выглядеть:
#![allow(unused)] fn main() { #[derive(Debug, Default)] pub struct MyGreeter {} #[tonic::async_trait] impl Greeter for MyGreeter { async fn say_hello( &self, request: Request<HelloRequest>, // Принимаем запрос типа HelloRequest ) -> Result<Response<HelloReply>, Status> { // Возвращаем экземпляр типа HelloReply println!("Получен запрос: {:?}", request); let reply = HelloReply { // Мы должны использовать .into_inner(), так как поля gRPC-запросов и ответов приватны message: format!("Привет, {}!", request.into_inner().name), }; Ok(Response::new(reply)) // Отправляем обратно наше отформатированное приветствие } } }
Наконец, давайте определим рантайм Tokio, на котором фактически будет работать наш сервер. Это требует добавления Tokio в качестве зависимости, так что убедитесь, что вы его включили!
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); Server::builder() .add_service(GreeterServer::new(greeter)) .serve(addr) .await?; Ok(()) }
В целом ваш сервер должен выглядеть примерно так, когда вы закончите:
use tonic::{transport::Server, Request, Response, Status}; use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; pub mod hello_world { tonic::include_proto!("helloworld"); } #[derive(Debug, Default)] pub struct MyGreeter {} #[tonic::async_trait] impl Greeter for MyGreeter { async fn say_hello( &self, request: Request<HelloRequest>, ) -> Result<Response<HelloReply>, Status> { println!("Получен запрос: {:?}", request); let reply = HelloReply { message: format!("Привет, {}!", request.into_inner().name), }; Ok(Response::new(reply)) } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); Server::builder() .add_service(GreeterServer::new(greeter)) .serve(addr) .await?; Ok(()) }
Теперь вы должны иметь возможность запустить ваш gRPC-сервер HelloWorld с помощью команды cargo run --bin helloworld-server. Это использует [[bin]], который мы определили ранее в нашем Cargo.toml, чтобы запустить именно сервер.
Если у вас есть GUI-клиент для gRPC, такой как Postman, вы сможете отправлять запросы на сервер и получать приветствия обратно!
Или, если вы используете grpcurl, то вы можете просто попробовать отправить запросы так:
$ grpcurl -plaintext -import-path ./proto -proto helloworld.proto -d '{"name": "Tonic"}' '[::1]:50051' helloworld.Greeter/SayHello
И получать ответы вида:
{
"message": "Hello Tonic!"
}
Написание нашего Клиента
Итак, теперь у нас есть работающий gRPC-сервер, и это здорово, но как наше приложение может с ним взаимодействовать? Здесь нам пригодится наш клиент. Tonic поддерживает реализации как клиента, так и сервера. Аналогично серверу, мы начнём с создания файла client.rs в нашей директории /src и импорта всего, что нам понадобится:
#![allow(unused)] fn main() { use hello_world::greeter_client::GreeterClient; use hello_world::HelloRequest; pub mod hello_world { tonic::include_proto!("helloworld"); } }
Клиент намного проще сервера, так как нам не нужно реализовывать какие-либо методы сервиса, а только делать запросы. Вот рантайм Tokio, который сделает наш запрос и выведет ответ в ваш терминал:
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = GreeterClient::connect("http://[::1]:50051").await?; let request = tonic::Request::new(HelloRequest { name: "Tonic".into(), }); let response = client.say_hello(request).await?; println!("ОТВЕТ={:?}", response); Ok(()) }
Вот и всё! Наш завершённый клиентский файл должен выглядеть примерно так, как показано ниже. Если это не так, вернитесь и убедитесь, что вы всё сделали правильно:
use hello_world::greeter_client::GreeterClient; use hello_world::HelloRequest; pub mod hello_world { tonic::include_proto!("helloworld"); } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = GreeterClient::connect("http://[::1]:50051").await?; let request = tonic::Request::new(HelloRequest { name: "Tonic".into(), }); let response = client.say_hello(request).await?; println!("ОТВЕТ={:?}", response); Ok(()) }
Собираем всё вместе
На этом этапе мы написали наш protobuf-файл, файл сборки для компиляции наших protobuf-файлов, сервер, который реализует наш сервис SayHello, и клиент, который делает запросы к нашему серверу. У вас должен быть файл proto/helloworld.proto, файл build.rs в корне проекта, а также файлы src/server.rs и src/client.rs.
Чтобы запустить сервер, выполните cargo run --bin helloworld-server.
Чтобы запустить клиент, выполните cargo run --bin helloworld-client в другом окне терминала.
Вы должны увидеть запрос, залогированный сервером в его окне терминала, а также ответ, залогированный клиентом в его окне.
Поздравляем с прохождением этого вводного руководства! Мы надеемся, что это пошаговое руководство помогло вам понять основы Tonic и то, как начать писать высокопроизводительные, совместимые и гибкие gRPC-серверы на Rust. Для более углублённого руководства, демонстрирующего продвинутый gRPC-сервер на Tonic, пожалуйста, ознакомьтесь с [руководством по routeguide].
Основы gRPC: Tonic
Это руководство, адаптированное из grpc-go, предоставляет базовое введение в работу с gRPC и Tonic. Изучая этот пример, вы научитесь:
- Определять сервис в файле
.proto. - Генерировать код сервера и клиента.
- Писать простой клиент и сервер для вашего сервиса.
Предполагается, что вы знакомы с Protocol Buffers и основами Rust. Обратите внимание, что пример в этом руководстве использует версию proto3 языка Protocol Buffers. Подробнее об этом можно узнать в руководстве по языку proto3.
Зачем использовать gRPC?
Наш пример — простое приложение для построения маршрутов, которое позволяет клиентам получать информацию об особенностях на их маршруте, создавать сводку маршрута и обмениваться информацией о маршруте, такой как обновления о дорожном движении, с сервером и другими клиентами.
С gRPC мы можем один раз определить наш сервис в файле .proto и реализовать клиенты и серверы на любом из поддерживаемых gRPC языков, которые, в свою очередь, могут работать в средах от серверов внутри Google до вашего собственного планшета — вся сложность общения между разными языками и средами обрабатывается за вас gRPC. Мы также получаем все преимущества работы с Protocol Buffers, включая эффективную сериализацию, простой IDL и легкое обновление интерфейсов.
Предварительные требования
Для запуска примеров кода и прохождения руководства единственным предварительным требованием является сам Rust. rustup — удобный инструмент для его установки, если у вас его ещё нет.
Запуск примера
Клонируйте или скачайте репозиторий Tonic:
$ git clone https://github.com/hyperium/tonic.git
Перейдите в корневую директорию репозитория Tonic:
$ cd tonic
Запустите сервер
$ cargo run --bin routeguide-server
В отдельной оболочке запустите клиент
$ cargo run --bin routeguide-client
Вы должны увидеть быстрый вывод логов в обоих окнах терминала. В оболочке, где вы запустили клиентский бинарник, вы должны увидеть вывод двунаправленного потокового RPC, печатающий по одной строке в секунду:
NOTE = RouteNote { location: Some(Point { latitude: 409146139, longitude: -746188906 }), message: "at 1.000319208s" }
Если прокрутите вверх, вы должны увидеть вывод других 3 типов запросов: простой RPC, серверный поток и клиентский поток.
Настройка проекта
Мы будем разрабатывать наш пример с нуля в новом крейте:
$ cargo new routeguide
$ cd routeguide
Определение сервиса
Наш первый шаг — определить gRPC-сервис и типы запросов и ответов методов, используя Protocol Buffers. Мы будем хранить наши .proto-файлы в директории в корне нашего крейта. Обратите внимание, что Tonic не особо важен путь, по которому лежат наши .proto-определения. Позже в руководстве мы увидим, как использовать различную конфигурацию генерации кода.
$ mkdir proto && touch proto/route_guide.proto
Полный .proto-файл можно посмотреть в examples/proto/routeguide/route_guide.proto.
Чтобы определить сервис, вы указываете именованный service в вашем .proto-файле:
service RouteGuide {
...
}
Затем вы определяете rpc-методы внутри определения вашего сервиса, указывая их типы запросов и ответов. gRPC позволяет определять четыре вида методов сервиса, и все они используются в сервисе RouteGuide:
- Простой RPC, где клиент отправляет запрос на сервер и ждет ответа, прямо как при обычном вызове функции.
// Получает объект (Feature) в заданной позиции.
rpc GetFeature(Point) returns (Feature) {}
- Серверный потоковый RPC, где клиент отправляет запрос на сервер и получает поток для чтения последовательности сообщений. Клиент читает из возвращенного потока, пока сообщения не закончатся. Как видно в нашем примере, серверный потоковый метод указывается путем размещения ключевого слова
streamперед типом ответа.
// Получает объекты (Features), доступные в пределах заданного Прямоугольника (Rectangle). Результаты передаются потоком, а не возвращаются сразу (например, в сообщении ответа с повторяющимся полем), так как прямоугольник может покрывать большую площадь и содержать огромное количество объектов.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- Клиентский потоковый RPC, где клиент записывает последовательность сообщений и отправляет их на сервер. После того как клиент закончил записывать сообщения, он ждет, пока сервер прочитает их все и вернет свой ответ. Клиентский потоковый метод указывается путем размещения ключевого слова
streamперед типом запроса.
// Принимает поток Точек (Points) на traversed маршруте, возвращая RouteSummary после завершения обхода.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- Двунаправленный потоковый RPC, где обе стороны отправляют последовательность сообщений. Два потока работают независимо, поэтому клиенты и серверы могут читать и писать в любом порядке: например, сервер может ждать получения всех клиентских сообщений перед записью своих ответов, или он может поочередно читать сообщение, а затем писать сообщение, или какая-либо другая комбинация чтения и записи. Порядок сообщений в каждом потоке сохраняется. Этот тип метода указывается путем размещения ключевого слова
streamкак перед запросом, так и перед ответом.
// Принимает поток RouteNotes, отправляемых во время прохождения маршрута, одновременно получая другие RouteNotes (например, от других пользователей).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
Наш .proto-файл также содержит определения типов сообщений Protocol Buffers для всех типов запросов и ответов, используемых в наших методах сервиса — например, вот тип сообщения Point:
// Точки представлены как пары широта-долгота в представлении E7 (градусы, умноженные на 10**7 и округленные до ближайшего целого числа).
// Широты должны быть в диапазоне +/- 90 градусов, а долготы должны быть в диапазоне +/- 180 градусов (включительно).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Генерация кода клиента и сервера
Tonic можно настроить для генерации кода как часть обычного процесса сборки Cargo. Это очень удобно, потому что после настройки всего не требуется дополнительных шагов для поддержания синхронизации сгенерированного кода и наших .proto-определений.
За кулисами Tonic использует PROST! для обработки сериализации Protocol Buffers и генерации кода.
Отредактируйте Cargo.toml и добавьте все зависимости, которые нам понадобятся для этого примера:
[dependencies]
tonic = "*"
prost = "0.14"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = "0.1"
async-stream = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.8"
[build-dependencies]
tonic-build = "*"
Создайте файл build.rs в корне вашего крейта:
fn main() { tonic_build::compile_protos("proto/route_guide.proto") .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); }
$ cargo build
Вот и всё. Сгенерированный код содержит:
- Определения структур для типов сообщений
Point,Rectangle,Feature,RouteNote,RouteSummary. - Трейт сервиса, который нам нужно реализовать:
route_guide_server::RouteGuide. - Тип клиента, который мы будем использовать для вызова сервера:
route_guide_client::RouteGuideClient<T>.
Если вам интересно, где находятся сгенерированные файлы, читайте дальше. Скоро тайна будет раскрыта! Теперь мы можем перейти к интересной части.
Создание сервера
Сначала давайте посмотрим, как мы создаем сервер RouteGuide. Если вас интересует только создание gRPC-клиентов, вы можете пропустить этот раздел и перейти прямо к Созданию клиента (хотя вам всё равно может быть интересно!).
Есть две части, чтобы заставить наш сервис RouteGuide выполнять свою работу:
- Реализация трейта сервиса, сгенерированного из нашего определения сервиса.
- Запуск gRPC-сервера для прослушивания запросов от клиентов.
Вы можете найти наш пример сервера RouteGuide в examples/src/routeguide/server.rs.
Реализация трейта сервера RouteGuide
Мы можем начать с определения структуры для представления нашего сервиса, пока мы можем сделать это в main.rs:
#![allow(unused)] fn main() { #[derive(Debug)] struct RouteGuideService; }
Далее нам нужно реализовать трейт route_guide_server::RouteGuide, который генерируется на этапе нашей сборки. Сгенерированный код помещается внутрь нашей целевой директории, в местоположение, определяемое переменной окружения OUT_DIR, которую устанавливает Cargo. Для нашего примера это означает, что вы можете найти сгенерированный код по пути, похожему на target/debug/build/routeguide/out/routeguide.rs.
Вы можете узнать больше о build.rs и переменной окружения OUT_DIR в книге Cargo.
Мы можем использовать макрос include_proto от Tonic, чтобы внести сгенерированный код в область видимости:
#![allow(unused)] fn main() { pub mod routeguide { tonic::include_proto!("routeguide"); } use routeguide::route_guide_server::{RouteGuide, RouteGuideServer}; use routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary}; }
Примечание: Лексема, передаваемая макросу include_proto (в нашем случае "routeguide"), — это имя пакета, объявленного в нашем .proto-файле, а не имя файла, например "routeguide.rs".
После этого мы можем набросать заглушку для нашей реализации сервиса:
#![allow(unused)] fn main() { use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use tokio_stream::{wrappers::ReceiverStream, Stream}; }
#![allow(unused)] fn main() { #[tonic::async_trait] impl RouteGuide for RouteGuideService { async fn get_feature(&self, _request: Request<Point>) -> Result<Response<Feature>, Status> { unimplemented!() } type ListFeaturesStream = ReceiverStream<Result<Feature, Status>>; async fn list_features( &self, _request: Request<Rectangle>, ) -> Result<Response<Self::ListFeaturesStream>, Status> { unimplemented!() } async fn record_route( &self, _request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { unimplemented!() } type RouteChatStream = Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>; async fn route_chat( &self, _request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<Self::RouteChatStream>, Status> { unimplemented!() } } }
Примечание: Атрибутный макрос tonic::async_trait добавляет поддержку асинхронных функций в трейтах. Внутри он использует async-trait. Вы можете узнать больше об async fn в трейтах в Async Book.
Состояние сервера
Нашему сервису нужен доступ к неизменяемому списку объектов (features). При запуске сервера мы собираемся десериализовать их из json-файла и хранить как наше единственное общее состояние:
#![allow(unused)] fn main() { #[derive(Debug)] pub struct RouteGuideService { features: Arc<Vec<Feature>>, } }
Создайте json-файл с данными и вспомогательный модуль для чтения и десериализации наших объектов.
$ mkdir data && touch data/route_guide_db.json
$ touch src/data.rs
Вы можете найти наш пример json-данных в examples/data/route_guide_db.json и соответствующий модуль data для загрузки и десериализации в examples/routeguide/data.rs.
Примечание: Если вы следуете руководству, вам нужно будет изменить путь к файлу данных с examples/data/route_guide_db.json на data/route_guide_db.json.
Наконец, нам нужно реализовать две вспомогательные функции: in_range и calc_distance. Мы будем использовать их при поиске объектов. Вы можете найти их в examples/src/routeguide/server.rs.
Типы запросов и ответов
Все наши методы сервиса получают tonic::Request<T> и возвращают Result<tonic::Response<T>, tonic::Status>. Конкретный тип T зависит от того, как наши методы объявлены в нашем сервисном определении .proto. Это может быть:
- Единичное значение, например
Point,Rectangle, или даже тип сообщения, включающий повторяющееся поле. - Поток значений, например
impl Stream<Item = Result<Feature, tonic::Status>>.
Простой RPC
Давайте сначала рассмотрим самый простой метод, get_feature, который просто получает tonic::Request<Point> от клиента и пытается найти объект (Feature) в данной Point. Если объект не найден, возвращается пустой.
#![allow(unused)] fn main() { async fn get_feature(&self, request: Request<Point>) -> Result<Response<Feature>, Status> { for feature in &self.features[..] { if feature.location.as_ref() == Some(request.get_ref()) { return Ok(Response::new(feature.clone())); } } Ok(Response::new(Feature::default())) } }
Серверный потоковый RPC
Теперь давайте посмотрим на один из наших потоковых RPC. list_features — это серверный потоковый RPC, поэтому нам нужно отправить обратно несколько Feature нашему клиенту.
#![allow(unused)] fn main() { type ListFeaturesStream = ReceiverStream<Result<Feature, Status>>; async fn list_features( &self, request: Request<Rectangle>, ) -> Result<Response<Self::ListFeaturesStream>, Status> { let (tx, rx) = mpsc::channel(4); let features = self.features.clone(); tokio::spawn(async move { for feature in &features[..] { if in_range(feature.location.as_ref().unwrap(), request.get_ref()) { tx.send(Ok(feature.clone())).await.unwrap(); } } }); Ok(Response::new(ReceiverStream::new(rx))) } }
Как и get_feature, входные данные list_features — это одно сообщение, в данном случае Rectangle. Однако на этот раз нам нужно вернуть поток значений, а не одно. Мы создаем канал и порождаем новую асинхронную задачу, в которой выполняем поиск, отправляя объекты, удовлетворяющие нашим ограничениям, в канал.
Половина Stream канала возвращается вызывающей стороне, обернутая в tonic::Response.
Клиентский потоковый RPC
Теперь давайте рассмотрим что-то немного более сложное: клиентский потоковый метод record_route, где мы получаем поток Point от клиента и возвращаем одно значение RouteSummary с информацией об их поездке. Как вы можете видеть, на этот раз метод получает tonic::Request<tonic::Streaming<Point>>.
#![allow(unused)] fn main() { use std::time::Instant; use tokio_stream::StreamExt; }
#![allow(unused)] fn main() { async fn record_route( &self, request: Request<tonic::Streaming<Point>>, ) -> Result<Response<RouteSummary>, Status> { let mut stream = request.into_inner(); let mut summary = RouteSummary::default(); let mut last_point = None; let now = Instant::now(); while let Some(point) = stream.next().await { let point = point?; summary.point_count += 1; for feature in &self.features[..] { if feature.location.as_ref() == Some(&point) { summary.feature_count += 1; } } if let Some(ref last_point) = last_point { summary.distance += calc_distance(last_point, &point); } last_point = Some(point); } summary.elapsed_time = now.elapsed().as_secs() as i32; Ok(Response::new(summary)) } }
record_route концептуально прост: мы получаем поток Point и сворачиваем его в RouteSummary. Другими словами, мы строим суммарное значение по мере обработки каждого Point в нашем потоке, один за другим. Когда в нашем потоке больше нет Point, мы возвращаем RouteSummary, обернутый в tonic::Response.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC route_chat, который получает поток RouteNote и возвращает либо другой поток RouteNote, либо ошибку.
#![allow(unused)] fn main() { use std::collections::HashMap; }
#![allow(unused)] fn main() { type RouteChatStream = Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>; async fn route_chat( &self, request: Request<tonic::Streaming<RouteNote>>, ) -> Result<Response<Self::RouteChatStream>, Status> { let mut notes = HashMap::new(); let mut stream = request.into_inner(); let output = async_stream::try_stream! { while let Some(note) = stream.next().await { let note = note?; let location = note.location.unwrap(); let location_notes = notes.entry(location).or_insert(vec![]); location_notes.push(note); for note in location_notes { yield note.clone(); } } }; Ok(Response::new(Box::pin(output) as Self::RouteChatStream)) } }
route_chat использует крейт async-stream для выполнения асинхронного преобразования из одного (входного) потока в другой (выходной) поток. По мере обработки входных данных каждое значение вставляется в карту notes, при этом создается клон исходного RouteNote. Результирующий поток затем возвращается вызывающей стороне. Аккуратно.
Примечание: Забавное приведение as необходимо из-за ограничения в компиляторе Rust. Ожидается, что это скоро будет исправлено.
Запуск сервера
После того как мы реализовали все наши методы, нам также нужно запустить gRPC-сервер, чтобы клиенты могли фактически использовать наш сервис. Вот как выглядит наша функция main:
#![allow(unused)] fn main() { mod data; use tonic::transport::Server; }
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:10000".parse().unwrap(); let route_guide = RouteGuideService { features: Arc::new(data::load()), }; let svc = RouteGuideServer::new(route_guide); Server::builder().add_service(svc).serve(addr).await?; Ok(()) }
Для обработки запросов Tonic внутренне использует Tower и hyper. Что это значит, среди прочего, так это то, что у нас есть гибкий и компонуемый стек, на котором мы можем строить. Мы можем, например, добавить интерцептор для обработки запросов до того, как они достигнут наших методов сервиса.
Создание клиента
В этом разделе мы рассмотрим создание клиента Tonic для нашего сервиса RouteGuide. Вы можете увидеть наш полный пример клиентского кода в examples/src/routeguide/client.rs.
Наш крэйт будет иметь две бинарные цели: routeguide-client и routeguide-server. Нам нужно соответствующим образом отредактировать наш Cargo.toml:
[[bin]]
name = "routeguide-server"
path = "src/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client.rs"
Переименуйте main.rs в server.rs и создайте новый файл client.rs.
$ mv src/main.rs src/server.rs
$ touch src/client.rs
Чтобы вызывать методы сервиса, нам сначала нужно создать gRPC-клиент для связи с сервером. Как и в случае с сервером, мы начнем с внесения сгенерированного кода в область видимости:
pub mod routeguide { tonic::include_proto!("routeguide"); } use routeguide::route_guide_client::RouteGuideClient; use routeguide::{Point, Rectangle, RouteNote}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = RouteGuideClient::connect("http://[::1]:10000").await?; Ok(()) }
Как и в реализации сервера, мы начинаем с внесения нашего сгенерированного кода в область видимости. Затем мы создаем клиента в нашей основной функции, передавая полный URL сервера в RouteGuideClient::connect. Наш клиент теперь готов делать сервисные вызовы. Обратите внимание, что client является изменяемым, это потому, что ему нужно управлять внутренним состоянием.
Вызов методов сервиса
Теперь давайте посмотрим, как мы вызываем наши методы сервиса. Обратите внимание, что в Tonic RPC являются асинхронными, что означает, что вызовы RPC нужно .awaitить.
Простой RPC
Вызов простого RPC get_feature так же прост, как вызов локального метода:
#![allow(unused)] fn main() { use tonic::Request; }
#![allow(unused)] fn main() { let response = client .get_feature(Request::new(Point { latitude: 409146138, longitude: -746188906, })) .await?; println!("RESPONSE = {:?}", response); }
Мы вызываем клиентский метод get_feature, передавая единственное значение Point, обернутое в tonic::Request. Мы получаем обратно Result<tonic::Response<Feature>, tonic::Status>.
Серверный потоковый RPC
Вот где мы вызываем серверный потоковый метод list_features, который возвращает поток географических Feature.
#![allow(unused)] fn main() { use tonic::transport::Channel; use std::error::Error; }
#![allow(unused)] fn main() { async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> { let rectangle = Rectangle { lo: Some(Point { latitude: 400000000, longitude: -750000000, }), hi: Some(Point { latitude: 420000000, longitude: -730000000, }), }; let mut stream = client .list_features(Request::new(rectangle)) .await? .into_inner(); while let Some(feature) = stream.message().await? { println!("FEATURE = {:?}", feature); } Ok(()) } }
Как и в простом RPC, мы передаем запрос с одним значением. Однако вместо того, чтобы получить одно значение обратно, мы получаем поток Feature.
Мы используем метод message() из структуры tonic::Streaming, чтобы повторно читать ответы сервера в объект protobuf ответа (в данном случае Feature), пока в потоке не останется больше сообщений.
Клиентский потоковый RPC
Клиентский потоковый метод record_route принимает поток Point и возвращает одно значение RouteSummary.
#![allow(unused)] fn main() { use rand::rngs::ThreadRng; use rand::Rng; }
#![allow(unused)] fn main() { async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> { let mut rng = rand::thread_rng(); let point_count: i32 = rng.gen_range(2..100); let mut points = vec![]; for _ in 0..=point_count { points.push(random_point(&mut rng)) } println!("Traversing {} points", points.len()); let request = Request::new(tokio_stream::iter(points)); match client.record_route(request).await { Ok(response) => println!("SUMMARY: {:?}", response.into_inner()), Err(e) => println!("something went wrong: {:?}", e), } Ok(()) } }
#![allow(unused)] fn main() { fn random_point(rng: &mut ThreadRng) -> Point { let latitude = (rng.gen_range(0..180) - 90) * 10_000_000; let longitude = (rng.gen_range(0..360) - 180) * 10_000_000; Point { latitude, longitude, } } }
Мы строим вектор случайного количества значений Point (от 2 до 100), а затем преобразуем его в Stream с помощью функции tokio_stream::iter. Это дешевый и простой способ получить поток, подходящий для передачи в наш метод сервиса. Полученный поток затем оборачивается в tonic::Request.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC. Метод route_chat принимает поток RouteNote и возвращает либо другой поток RouteNote, либо ошибку.
#![allow(unused)] fn main() { use std::time::Duration; use tokio::time; }
#![allow(unused)] fn main() { async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> { let start = time::Instant::now(); let outbound = async_stream::stream! { let mut interval = time::interval(Duration::from_secs(1)); while let time = interval.tick().await { let elapsed = time.duration_since(start); let note = RouteNote { location: Some(Point { latitude: 409146138 + elapsed.as_secs() as i32, longitude: -746188906, }), message: format!("at {:?}", elapsed), }; yield note; } }; let response = client.route_chat(Request::new(outbound)).await?; let mut inbound = response.into_inner(); while let Some(note) = inbound.message().await? { println!("NOTE = {:?}", note); } Ok(()) } }
В этом случае мы используем крейт async-stream для генерации нашего исходящего потока, создавая значения RouteNote с интервалом в одну секунду. Затем мы перебираем поток, возвращенный сервером, печатая каждое значение в потоке.
Попробуйте!
Запустите сервер
$ cargo run --bin routeguide-server
Запустите клиент
$ cargo run --bin routeguide-client
Приложение
Конфигурация tonic_build
Конфигурация генерации кода по умолчанию в Tonic удобна для самодостаточных примеров и небольших проектов. Однако есть случаи, когда нам нужен немного другой рабочий процесс. Например:
- При сборке Rust-клиентов и серверов в разных крейтах.
- При сборке Rust-клиента или сервера (или обоих) в рамках более крупного многопользовательского проекта.
- Когда мы хотим поддержку редактора для сгенерированного кода, а наш редактор не индексирует сгенерированные файлы в местоположении по умолчанию.
Более общо, когда мы хотим хранить наши .proto-определения в центральном месте и генерировать код для разных крейтов или разных языков, конфигурации по умолчанию недостаточно.
К счастью, tonic_build можно настроить в соответствии с любыми нашими потребностями. Вот всего две возможности:
- Мы можем хранить наши
.proto-определения в отдельном крейте и генерировать наш код по требованию, в отличие от времени сборки, помещая результирующие модули туда, где они нам нужны.
main.rs
fn main() { tonic_build::configure() .build_client(false) .out_dir("another_crate/src/pb") .compile(&["path/my_proto.proto"], &["path"]) .expect("failed to compile protos"); }
При cargo run это сгенерирует код только для сервера и поместит результирующий файл в another_crate/src/pb.
- Аналогично, мы также можем хранить
.proto-определения в отдельном крейте, а затем использовать этот крэйт как прямую зависимость везде, где он нам нужен.