I/O

Ввод-вывод в Tokio работает во многом так же, как и в std, но асинхронно. Существует трейт для чтения (AsyncRead) и трейт для записи (AsyncWrite). Конкретные типы реализуют эти трейты соответствующим образом (TcpStream, File, Stdout). AsyncRead и AsyncWrite также реализованы для ряда структур данных, таких как Vec<u8> и &[u8]. Это позволяет использовать байтовые массивы там, где ожидается читатель или писатель.

На этой странице будут рассмотрены базовое чтение и запись ввода-вывода с Tokio и пройдены несколько примеров. На следующей странице мы перейдем к более продвинутому примеру ввода-вывода.

AsyncRead и AsyncWrite

Эти два трейта предоставляют возможности для асинхронного чтения из байтовых потоков и записи в них. Методы этих трейтов обычно не вызываются напрямую, аналогично тому, как вы не вызываете вручную метод poll из трейта Future. Вместо этого вы будете использовать их через вспомогательные методы, предоставляемые AsyncReadExt и AsyncWriteExt.

Давайте кратко рассмотрим несколько из этих методов. Все эти функции являются async и должны использоваться с .await.

async fn read()

AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращая количество прочитанных байтов.

Примечание: когда read() возвращает Ok(0), это означает, что поток закрыт. Любые последующие вызовы read() будут немедленно завершаться с Ok(0). Для экземпляров TcpStream это означает, что половина сокета для чтения закрыта.

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // читаем до 10 байт
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}
}

async fn read_to_end()

AsyncReadExt::read_to_end читает все байты из потока до EOF.

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // читаем весь файл
    f.read_to_end(&mut buffer).await?;
    Ok(())
}
}

async fn write()

AsyncWriteExt::write записывает буфер в писатель, возвращая количество записанных байтов.

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // Записывает некоторый префикс байтовой строки, но не обязательно всю.
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}
}

async fn write_all()

AsyncWriteExt::write_all записывает весь буфер в писатель.

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    file.write_all(b"some bytes").await?;
    Ok(())
}
}

Оба трейта включают множество других полезных методов. См. документацию API для полного списка.

Вспомогательные функции

Кроме того, как и в std, модуль tokio::io содержит множество полезных утилитных функций, а также API для работы со стандартным вводом, стандартным выводом и стандартной ошибкой. Например, tokio::io::copy асинхронно копирует все содержимое читателя в писатель.

use tokio::fs::File;
use tokio::io;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}
}

Обратите внимание, что здесь используется тот факт, что байтовые массивы также реализуют AsyncRead.

Эхо-сервер

Давайте попрактикуемся в асинхронном вводе-выводе. Мы напишем эхо-сервер.

Эхо-сервер привязывает TcpListener и принимает входящие соединения в цикле. Для каждого входящего соединения данные читаются из сокета и немедленно записываются обратно в сокет. Клиент отправляет данные на сервер и получает точно такие же данные обратно.

Мы реализуем эхо-сервер дважды, используя немного разные стратегии.

Использование io::copy()

Для начала мы реализуем логику эха с помощью утилиты io::copy.

Вы можете написать этот код в новом бинарном файле:

$ touch src/bin/echo-server-copy.rs

Который вы можете запустить (или просто проверить компиляцию) с помощью:

$ cargo run --bin echo-server-copy

Вы сможете испытать сервер, используя стандартное командное средство, такое как telnet, или написав простой клиент, подобный тому, который находится в документации для tokio::net::TcpStream.

Это TCP-сервер, и ему нужен цикл принятия. Для каждого принятого сокета создается новая задача.

use tokio::io;
use tokio::net::TcpListener;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Копируем данные здесь
        });
    }
}
}

Как было показано ранее, эта утилитная функция принимает читателя и писателя и копирует данные из одного в другой. Однако у нас есть только один TcpStream. Этот единственный значение реализует и AsyncRead, и AsyncWrite. Поскольку io::copy требует &mut как для читателя, так и для писателя, сокет не может быть использован для обоих аргументов.

#![allow(unused)]
fn main() {
// Это не компилируется
io::copy(&mut socket, &mut socket).await
}

Разделение читателя + писателя

Чтобы обойти эту проблему, мы должны разделить сокет на handle читателя и handle писателя. Лучший способ разделить комбинацию читатель/писатель зависит от конкретного типа.

Любой тип читатель + писатель может быть разделен с помощью утилиты io::split. Эта функция принимает одно значение и возвращает отдельные handle читателя и писателя. Эти два handle могут использоваться независимо, в том числе из отдельных задач.

Например, эхо-клиент может обрабатывать конкурентные чтение и запись следующим образом:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Записываем данные в фоне
    tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Иногда rust выводу типов нужна небольшая помощь
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}
}

Поскольку io::split поддерживает любое значение, которое реализует AsyncRead + AsyncWrite, и возвращает независимые handle, внутри io::split используется Arc и Mutex. Этих накладных расходов можно избежать с TcpStream. TcpStream предлагает две специализированные функции разделения.

TcpStream::split принимает ссылку на поток и возвращает handle читателя и писателя. Поскольку используется ссылка, оба handle должны оставаться в той же задаче, из которой был вызван split(). Это специализированное split является бесплатным. Не нужны Arc или Mutex. TcpStream также предоставляет into_split, который поддерживает handle, которые могут перемещаться между задачами, ценой только Arc.

Поскольку io::copy() вызывается в той же задаче, которой принадлежит TcpStream, мы можем использовать TcpStream::split. Задача, которая обрабатывает логику эха в сервере, становится:

#![allow(unused)]
fn main() {
use tokio::io;
use tokio::net::TcpStream;
fn dox(mut socket: TcpStream) {
tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});
}
}

Полный код можно найти здесь.

Ручное копирование

Теперь давайте посмотрим, как мы бы написали эхо-сервер, копируя данные вручную. Для этого мы используем AsyncReadExt::read и AsyncWriteExt::write_all.

Полный эхо-сервер выглядит следующим образом:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта
                    Ok(0) => return,
                    Ok(n) => {
                        // Копируем данные обратно в сокет
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку.
                            return;
                        }
                    }
                    Err(_) => {
                        // Неожиданная ошибка сокета. Здесь мало что можно сделать, поэтому просто останавливаем обработку.
                        return;
                    }
                }
            }
        });
    }
}
}

(Вы можете поместить этот код в src/bin/echo-server.rs и запустить его с помощью cargo run --bin echo-server).

Давайте разберем его. Во-первых, поскольку используются утилиты AsyncRead и AsyncWrite, трейты расширения должны быть внесены в область видимости.

#![allow(unused)]
fn main() {
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
}

Выделение буфера

Стратегия заключается в том, чтобы прочитать некоторые данные из сокета в буфер, а затем записать содержимое буфера обратно в сокет.

#![allow(unused)]
fn main() {
let mut buf = vec![0; 1024];
}

Явно избегается стековый буфер. Вспомним из ранее, мы отметили, что все данные задачи, которые живут между вызовами .await, должны храниться задачей. В этом случае buf используется между вызовами .await. Все данные задачи хранятся в одном выделении памяти. Вы можете думать об этом как о enum, где каждый вариант - это данные, которые нужно хранить для конкретного вызова .await.

Если буфер представлен стековым массивом, внутренняя структура для задач, созданных для каждого принятого сокета, может выглядеть примерно так:

#![allow(unused)]
fn main() {
struct Task {
    // внутренние поля задачи здесь
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}
}

Если стековый массив используется как тип буфера, он будет храниться встроенным в структуру задачи. Это сделает структуру задачи очень большой. Кроме того, размеры буферов часто равны размеру страницы. Это, в свою очередь, сделает Task неудобного размера: $page-size + несколько-байт.

Компилятор оптимизирует расположение async блоков дальше, чем базовый enum. На практике переменные не перемещаются между вариантами, как это было бы необходимо с enum. Однако размер структуры задачи по крайней мере такой же большой, как самая большая переменная.

Из-за этого обычно более эффективно использовать выделенное выделение памяти для буфера.

Обработка EOF

Когда половина TCP-потока для чтения закрыта, вызов read() возвращает Ok(0). Важно выйти из цикла чтения в этот момент. Забывание прервать цикл чтения при EOF является распространенным источником ошибок.

#![allow(unused)]
fn main() {
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
async fn dox(mut socket: TcpStream) {
let mut buf = vec![0_u8; 1024];
loop {
    match socket.read(&mut buf).await {
        // Возвращаемое значение `Ok(0)` означает, что удаленная сторона закрыта
        Ok(0) => return,
        // ... другие случаи обрабатываются здесь
_ => unreachable!(),
    }
}
}
}

Забывание прервать цикл чтения обычно приводит к ситуации бесконечного цикла с 100% загрузкой CPU. Поскольку сокет закрыт, socket.read() возвращается немедленно. Цикл затем повторяется вечно.

Полный код можно найти здесь.