Framing

Теперь мы применим то, что только что узнали о вводе-выводе, и реализуем слой фрейминга для Mini-Redis. Фрейминг - это процесс преобразования байтового потока в поток фреймов. Фрейм - это единица данных, передаваемая между двумя узлами. Протокольный фрейм Redis определяется следующим образом:

#![allow(unused)]
fn main() {
use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}
}

Обратите внимание, что фрейм состоит только из данных без какой-либо семантики. Разбор команд и их реализация происходят на более высоком уровне.

Для HTTP фрейм может выглядеть так:

#![allow(unused)]
fn main() {
use bytes::Bytes;
type Method = ();
type Uri = ();
type Version = ();
type HeaderMap = ();
type StatusCode = ();
enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}
}

Чтобы реализовать фрейминг для Mini-Redis, мы реализуем структуру Connection, которая оборачивает TcpStream и читает/пишет значения mini_redis::Frame.

#![allow(unused)]
fn main() {
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... другие поля здесь
}

impl Connection {
    /// Читает фрейм из соединения.
    /// 
    /// Возвращает `None`, если достигнут EOF
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // реализация здесь
unimplemented!();
    }

    /// Записывает фрейм в соединение.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // реализация здесь
unimplemented!();
    }
}
}

Подробности протокола Redis можно найти здесь. Полный код Connection находится здесь.

Буферизованное чтение

Метод read_frame ожидает получения всего фрейма перед возвратом. Один вызов TcpStream::read() может вернуть произвольное количество данных. Это может содержать целый фрейм, частичный фрейм или несколько фреймов. Если получен частичный фрейм, данные буферизуются и считываются дополнительные данные из сокета. Если получено несколько фреймов, возвращается первый фрейм, а остальные данные буферизуются до следующего вызова read_frame.

Если вы еще этого не сделали, создайте новый файл с именем connection.rs.

touch src/connection.rs

Для реализации этого Connection нуждается в поле буфера чтения. Данные считываются из сокета в буфер чтения. Когда фрейм разбирается, соответствующие данные удаляются из буфера.

Мы будем использовать BytesMut в качестве типа буфера. Это изменяемая версия Bytes.

#![allow(unused)]
fn main() {
use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Выделяем буфер вместимостью 4 КБ.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}
}

Далее мы реализуем метод read_frame().

#![allow(unused)]
fn main() {
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

struct Connection {
  stream: tokio::net::TcpStream,
  buffer: bytes::BytesMut,
}
struct Frame {}
impl Connection {
pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // Попытка разобрать фрейм из буферизованных данных. Если
        // было буферизовано достаточно данных, фрейм возвращается.
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Недостаточно буферизованных данных для чтения фрейма.
        // Попытка прочитать больше данных из сокета.
        //
        // При успехе возвращается количество байтов. `0`
        // указывает на "конец потока".
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // Удаленная сторона закрыла соединение. Для чистого
            // завершения в буфере чтения не должно быть данных.
            // Если они есть, это означает, что узел закрыл сокет
            // во время отправки фрейма.
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}
fn parse_frame(&self) -> Result<Option<Frame>> { unimplemented!() }
}
}

Давайте разберем это. Метод read_frame работает в цикле. Сначала вызывается self.parse_frame(). Это попытается разобрать фрейм redis из self.buffer. Если есть достаточно данных для разбора фрейма, фрейм возвращается вызывающей стороне read_frame(). В противном случае мы пытаемся прочитать больше данных из сокета в буфер. После чтения дополнительных данных parse_frame() вызывается снова. На этот раз, если было получено достаточно данных, разбор может быть успешным.

При чтении из потока возвращаемое значение 0 указывает, что больше данных от узла получено не будет. Если в буфере чтения все еще есть данные, это указывает на то, что был получен частичный фрейм и соединение завершается внезапно. Это ошибочная ситуация, и возвращается Err.

Трейт Buf

При чтении из потока вызывается read_buf. Эта версия функции чтения принимает значение, реализующее BufMut из крейта bytes.

Сначала рассмотрим, как мы бы реализовали тот же цикл чтения с помощью read(). Вместо BytesMut можно использовать Vec<u8>.

#![allow(unused)]
fn main() {
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Выделяем буфер вместимостью 4 КБ.
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}
}

И функция read_frame() на Connection:

#![allow(unused)]
fn main() {
use mini_redis::{Frame, Result};

use tokio::io::AsyncReadExt;
pub struct Connection {
    stream: tokio::net::TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}
impl Connection {
pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Убеждаемся, что буфер имеет емкость
        if self.buffer.len() == self.cursor {
            // Увеличиваем буфер
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Читаем в буфер, отслеживая количество
        // прочитанных байтов
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Обновляем наш курсор
            self.cursor += n;
        }
    }
}
fn parse_frame(&mut self) -> Result<Option<Frame>> { unimplemented!() }
}
}

При работе с байтовыми массивами и read мы также должны поддерживать курсор, отслеживающий, сколько данных было буферизовано. Мы должны убедиться, что передаем пустую часть буфера в read(). В противном случае мы перезапишем буферизованные данные. Если наш буфер заполняется, мы должны увеличить буфер, чтобы продолжить чтение. В parse_frame() (не включено) нам нужно будет разбирать данные, содержащиеся в self.buffer[..self.cursor].

Поскольку сочетание байтового массива с курсором очень распространено, крейт bytes предоставляет абстракцию, представляющую байтовый массив и курсор. Трейт Buf реализуется типами, из которых можно читать данные. Трейт BufMut реализуется типами, в которые можно записывать данные. При передаче T: BufMut в read_buf() внутренний курсор буфера автоматически обновляется read_buf. Благодаря этому в нашей версии read_frame нам не нужно управлять собственным курсором.

Кроме того, при использовании Vec<u8> буфер должен быть инициализирован. vec![0; 4096] выделяет массив из 4096 байтов и записывает ноль в каждую запись. При изменении размера буфера новая емкость также должна быть инициализирована нулями. Процесс инициализации не бесплатен. При работе с BytesMut и BufMut емкость не инициализирована. Абстракция BytesMut предотвращает чтение неинициализированной памяти. Это позволяет нам избежать шага инициализации.

Разбор

Теперь давайте посмотрим на функцию parse_frame(). Разбор выполняется в два этапа.

  1. Убедиться, что буферизован полный фрейм, и найти конечный индекс фрейма.
  2. Разобрать фрейм.

Крейт mini-redis предоставляет нам функцию для обоих этих этапов:

  1. Frame::check
  2. Frame::parse

Мы также будем повторно использовать абстракцию Buf для помощи. Buf передается в Frame::check. Поскольку функция check перебирает переданный буфер, внутренний курсор будет продвигаться. Когда check возвращается, внутренний курсор буфера указывает на конец фрейма.

Для типа Buf мы будем использовать std::io::Cursor<&[u8]>.

#![allow(unused)]
fn main() {
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

pub struct Connection {
    stream: tokio::net::TcpStream,
    buffer: bytes::BytesMut,
}
impl Connection {
fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Создаем тип `T: Buf`.
    let mut buf = Cursor::new(&self.buffer[..]);

    // Проверяем, доступен ли полный фрейм
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Получаем длину фрейма в байтах
            let len = buf.position() as usize;

            // Сбрасываем внутренний курсор для
            // вызова `parse`.
            buf.set_position(0);

            // Разбираем фрейм
            let frame = Frame::parse(&mut buf)?;

            // Удаляем фрейм из буфера
            self.buffer.advance(len);

            // Возвращаем фрейм вызывающей стороне.
            Ok(Some(frame))
        }
        // Недостаточно данных было буферизовано
        Err(Incomplete) => Ok(None),
        // Произошла ошибка
        Err(e) => Err(e.into()),
    }
}
}
}

Полную функцию Frame::check можно найти здесь. Мы не будем рассматривать ее полностью.

Важно отметить, что используются API Buf в стиле "байтового итератора". Они извлекают данные и продвигают внутренний курсор. Например, чтобы разобрать фрейм, проверяется первый байт, чтобы определить тип фрейма. Используемая функция - Buf::get_u8. Это извлекает байт в текущей позиции курсора и продвигает курсор на единицу.

Есть еще полезные методы в трейте Buf. Проверьте документацию API для более подробной информации.

Буферизованная запись

Другая половина API фрейминга - это функция write_frame(frame). Эта функция записывает целый фрейм в сокет. Чтобы минимизировать системные вызовы write, записи будут буферизоваться. Поддерживается буфер записи, и фреймы кодируются в этот буфер перед записью в сокет. Однако, в отличие от read_frame(), весь фрейм не всегда буферизуется в байтовый массив перед записью в сокет.

Рассмотрим фрейм массовой передачи. Записываемое значение - Frame::Bulk(Bytes). Проводной формат массового фрейма - это заголовок фрейма, состоящий из символа $, за которым следует длина данных в байтах. Большая часть фрейма - это содержимое значения Bytes. Если данные большие, копирование их в промежуточный буфер будет дорогостоящим.

Для реализации буферизованной записи мы будем использовать BufWriter struct. Эта структура инициализируется с T: AsyncWrite и сама реализует AsyncWrite. Когда write вызывается на BufWriter, запись идет не напрямую к внутреннему писателю, а в буфер. Когда буфер заполняется, содержимое сбрасывается во внутренний писатель, и внутренний буфер очищается. Также есть оптимизации, позволяющие обходить буфер в определенных случаях.

Мы не будем пытаться выполнить полную реализацию write_frame() в рамках руководства. См. полную реализацию здесь.

Сначала обновляется структура Connection:

#![allow(unused)]
fn main() {
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}
}

Далее реализуется write_frame().

#![allow(unused)]
fn main() {
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

struct Connection {
  stream: tokio::io::BufWriter<tokio::net::TcpStream>,
  buffer: bytes::BytesMut,
}
impl Connection {
async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}
async fn write_decimal(&mut self, val: u64) -> io::Result<()> { unimplemented!() }
}
}

Используемые здесь функции предоставляются AsyncWriteExt. Они доступны и на TcpStream, но не рекомендуется выполнять однобайтовые записи без промежуточного буфера.

  • write_u8 записывает один байт в писатель.
  • write_all записывает весь срез в писатель.
  • write_decimal реализована mini-redis.

Функция завершается вызовом self.stream.flush().await. Поскольку BufWriter хранит записи в промежуточном буфере, вызовы write не гарантируют, что данные записаны в сокет. Перед возвратом мы хотим, чтобы фрейм был записан в сокет. Вызов flush() записывает любые данные, ожидающие в буфере, в сокет.

Другая альтернатива - не вызывать flush() в write_frame(). Вместо этого предоставить функцию flush() на Connection. Это позволило бы вызывающей стороне записать несколько маленьких фреймов в буфер записи, а затем записать их все в сокет одним системным вызовом write. Это усложняет API Connection. Простота - одна из целей Mini-Redis, поэтому мы решили включить вызов flush().await в fn write_frame().