I/O
Ввод-вывод в Tokio работает во многом так же, как и в std, но асинхронно. Существует трейт для чтения (AsyncRead) и трейт для записи (AsyncWrite). Конкретные типы реализуют эти трейты соответствующим образом (TcpStream, File, Stdout). AsyncRead и AsyncWrite также реализованы для ряда структур данных, таких как Vec<u8> и &[u8]. Это позволяет использовать байтовые массивы там, где ожидается читатель или писатель.
На этой странице будут рассмотрены базовое чтение и запись ввода-вывода с Tokio и пройдены несколько примеров. На следующей странице мы перейдем к более продвинутому примеру ввода-вывода.
AsyncRead и AsyncWrite
Эти два трейта предоставляют возможности для асинхронного чтения из байтовых потоков и записи в них. Методы этих трейтов обычно не вызываются напрямую, аналогично тому, как вы не вызываете вручную метод poll из трейта Future. Вместо этого вы будете использовать их через вспомогательные методы, предоставляемые AsyncReadExt и AsyncWriteExt.
Давайте кратко рассмотрим несколько из этих методов. Все эти функции являются async и должны использоваться с .await.
async fn read()
AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращая количество прочитанных байтов.
Примечание: когда read() возвращает Ok(0), это означает, что поток закрыт. Любые последующие вызовы read() будут немедленно завершаться с Ok(0). Для экземпляров TcpStream это означает, что половина сокета для чтения закрыта.
use tokio::fs::File; use tokio::io::{self, AsyncReadExt}; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = [0; 10]; // читаем до 10 байт let n = f.read(&mut buffer[..]).await?; println!("The bytes: {:?}", &buffer[..n]); Ok(()) } }
async fn read_to_end()
AsyncReadExt::read_to_end читает все байты из потока до EOF.
use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = Vec::new(); // читаем весь файл f.read_to_end(&mut buffer).await?; Ok(()) } }
async fn write()
AsyncWriteExt::write записывает буфер в писатель, возвращая количество записанных байтов.
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; // Записывает некоторый префикс байтовой строки, но не обязательно всю. let n = file.write(b"some bytes").await?; println!("Wrote the first {} bytes of 'some bytes'.", n); Ok(()) } }
async fn write_all()
AsyncWriteExt::write_all записывает весь буфер в писатель.
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; file.write_all(b"some bytes").await?; Ok(()) } }
Оба трейта включают множество других полезных методов. См. документацию API для полного списка.
Вспомогательные функции
Кроме того, как и в std, модуль tokio::io содержит множество полезных утилитных функций, а также API для работы со стандартным вводом, стандартным выводом и стандартной ошибкой. Например, tokio::io::copy асинхронно копирует все содержимое читателя в писатель.
use tokio::fs::File; use tokio::io; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let mut reader: &[u8] = b"hello"; let mut file = File::create("foo.txt").await?; io::copy(&mut reader, &mut file).await?; Ok(()) } }
Обратите внимание, что здесь используется тот факт, что байтовые массивы также реализуют AsyncRead.
Эхо-сервер
Давайте попрактикуемся в асинхронном вводе-выводе. Мы напишем эхо-сервер.
Эхо-сервер привязывает TcpListener и принимает входящие соединения в цикле. Для каждого входящего соединения данные читаются из сокета и немедленно записываются обратно в сокет. Клиент отправляет данные на сервер и получает точно такие же данные обратно.
Мы реализуем эхо-сервер дважды, используя немного разные стратегии.
Использование io::copy()
Для начала мы реализуем логику эха с помощью утилиты io::copy.
Вы можете написать этот код в новом бинарном файле:
$ touch src/bin/echo-server-copy.rs
Который вы можете запустить (или просто проверить компиляцию) с помощью:
$ cargo run --bin echo-server-copy
Вы сможете испытать сервер, используя стандартное командное средство, такое как telnet, или написав простой клиент, подобный тому, который находится в документации для tokio::net::TcpStream.
Это TCP-сервер, и ему нужен цикл принятия. Для каждого принятого сокета создается новая задача.
use tokio::io; use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { // Копируем данные здесь }); } } }
Как было показано ранее, эта утилитная функция принимает читателя и писателя и копирует данные из одного в другой. Однако у нас есть только один TcpStream. Этот единственный значение реализует и AsyncRead, и AsyncWrite. Поскольку io::copy требует &mut как для читателя, так и для писателя, сокет не может быть использован для обоих аргументов.
#![allow(unused)] fn main() { // Это не компилируется io::copy(&mut socket, &mut socket).await }
Разделение читателя + писателя
Чтобы обойти эту проблему, мы должны разделить сокет на handle читателя и handle писателя. Лучший способ разделить комбинацию читатель/писатель зависит от конкретного типа.
Любой тип читатель + писатель может быть разделен с помощью утилиты io::split. Эта функция принимает одно значение и возвращает отдельные handle читателя и писателя. Эти два handle могут использоваться независимо, в том числе из отдельных задач.
Например, эхо-клиент может обрабатывать конкурентные чтение и запись следующим образом:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let socket = TcpStream::connect("127.0.0.1:6142").await?; let (mut rd, mut wr) = io::split(socket); // Записываем данные в фоне tokio::spawn(async move { wr.write_all(b"hello\r\n").await?; wr.write_all(b"world\r\n").await?; // Иногда rust выводу типов нужна небольшая помощь Ok::<_, io::Error>(()) }); let mut buf = vec![0; 128]; loop { let n = rd.read(&mut buf).await?; if n == 0 { break; } println!("GOT {:?}", &buf[..n]); } Ok(()) } }
Поскольку io::split поддерживает любое значение, которое реализует AsyncRead + AsyncWrite, и возвращает независимые handle, внутри io::split используется Arc и Mutex. Этих накладных расходов можно избежать с TcpStream. TcpStream предлагает две специализированные функции разделения.
TcpStream::split принимает ссылку на поток и возвращает handle читателя и писателя. Поскольку используется ссылка, оба handle должны оставаться в той же задаче, из которой был вызван split(). Это специализированное split является бесплатным. Не нужны Arc или Mutex. TcpStream также предоставляет into_split, который поддерживает handle, которые могут перемещаться между задачами, ценой только Arc.
Поскольку io::copy() вызывается в той же задаче, которой принадлежит TcpStream, мы можем использовать TcpStream::split. Задача, которая обрабатывает логику эха в сервере, становится:
#![allow(unused)] fn main() { use tokio::io; use tokio::net::TcpStream; fn dox(mut socket: TcpStream) { tokio::spawn(async move { let (mut rd, mut wr) = socket.split(); if io::copy(&mut rd, &mut wr).await.is_err() { eprintln!("failed to copy"); } }); } }
Полный код можно найти здесь.
Ручное копирование
Теперь давайте посмотрим, как мы бы написали эхо-сервер, копируя данные вручную. Для этого мы используем AsyncReadExt::read и AsyncWriteExt::write_all.
Полный эхо-сервер выглядит следующим образом:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = vec![0; 1024]; loop { match socket.read(&mut buf).await { // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта Ok(0) => return, Ok(n) => { // Копируем данные обратно в сокет if socket.write_all(&buf[..n]).await.is_err() { // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку. return; } } Err(_) => { // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку. return; } } } }); } } }
(Вы можете поместить этот код в src/bin/echo-server.rs и запустить его с помощью cargo run --bin echo-server).
Давайте разберем его. Во-первых, поскольку используются утилиты AsyncRead и AsyncWrite, трейты расширения должны быть внесены в область видимости.
#![allow(unused)] fn main() { use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; }
Выделение буфера
Стратегия заключается в том, чтобы прочитать некоторые данные из сокета в буфер, а затем записать содержимое буфера обратно в сокет.
#![allow(unused)] fn main() { let mut buf = vec![0; 1024]; }
Явно избегается стековый буфер. Вспомним из ранее, мы отметили, что все данные задачи, которые живут между вызовами .await, должны храниться задачей. В этом случае buf используется между вызовами .await. Все данные задачи хранятся в одном выделении памяти. Вы можете думать об этом как о enum, где каждый вариант - это данные, которые нужно хранить для конкретного вызова .await.
Если буфер представлен стековым массивом, внутренняя структура для задач, созданных для каждого принятого сокета, может выглядеть примерно так:
#![allow(unused)] fn main() { struct Task { // внутренние поля задачи здесь task: enum { AwaitingRead { socket: TcpStream, buf: [BufferType], }, AwaitingWriteAll { socket: TcpStream, buf: [BufferType], } } } }
Если стековый массив используется как тип буфера, он будет храниться встроенным в структуру задачи. Это сделает структуру задачи очень большой. Кроме того, размеры буферов часто равны размеру страницы. Это, в свою очередь, сделает Task неудобного размера: $page-size + несколько-байт.
Компилятор оптимизирует расположение async блоков дальше, чем базовый enum. На практике переменные не перемещаются между вариантами, как это было бы необходимо с enum. Однако размер структуры задачи по крайней мере такой же большой, как самая большая переменная.
Из-за этого обычно более эффективно использовать выделенное выделение памяти для буфера.
Обработка EOF
Когда половина TCP-потока для чтения закрыта, вызов read() возвращает Ok(0). Важно выйти из цикла чтения в этот момент. Забывание прервать цикл чтения при EOF является распространенным источником ошибок.
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use tokio::net::TcpStream; async fn dox(mut socket: TcpStream) { let mut buf = vec![0_u8; 1024]; loop { match socket.read(&mut buf).await { // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта Ok(0) => return, // ... другие случаи обрабатываются здесь _ => unreachable!(), } } } }
Забывание прервать цикл чтения обычно приводит к ситуации бесконечного цикла с 100% загрузкой CPU. Поскольку сокет закрыт, socket.read() возвращается немедленно. Цикл затем повторяется вечно.
Полный код можно найти здесь.