Framing
Теперь мы применим то, что только что узнали о вводе-выводе, и реализуем слой фрейминга для Mini-Redis. Фрейминг - это процесс преобразования байтового потока в поток фреймов. Фрейм - это единица данных, передаваемая между двумя узлами. Протокольный фрейм Redis определяется следующим образом:
#![allow(unused)] fn main() { use bytes::Bytes; enum Frame { Simple(String), Error(String), Integer(u64), Bulk(Bytes), Null, Array(Vec<Frame>), } }
Обратите внимание, что фрейм состоит только из данных без какой-либо семантики. Разбор команд и их реализация происходят на более высоком уровне.
Для HTTP фрейм может выглядеть так:
#![allow(unused)] fn main() { use bytes::Bytes; type Method = (); type Uri = (); type Version = (); type HeaderMap = (); type StatusCode = (); enum HttpFrame { RequestHead { method: Method, uri: Uri, version: Version, headers: HeaderMap, }, ResponseHead { status: StatusCode, version: Version, headers: HeaderMap, }, BodyChunk { chunk: Bytes, }, } }
Чтобы реализовать фрейминг для Mini-Redis, мы реализуем структуру Connection, которая оборачивает TcpStream и читает/пишет значения mini_redis::Frame.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Frame, Result}; struct Connection { stream: TcpStream, // ... другие поля здесь } impl Connection { /// Читает фрейм из соединения. /// /// Возвращает `None`, если достигнут EOF pub async fn read_frame(&mut self) -> Result<Option<Frame>> { // реализация здесь unimplemented!(); } /// Записывает фрейм в соединение. pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> { // реализация здесь unimplemented!(); } } }
Подробности протокола Redis можно найти здесь. Полный код Connection находится здесь.
Буферизованное чтение
Метод read_frame ожидает получения всего фрейма перед возвратом. Один вызов TcpStream::read() может вернуть произвольное количество данных. Это может содержать целый фрейм, частичный фрейм или несколько фреймов. Если получен частичный фрейм, данные буферизуются и считываются дополнительные данные из сокета. Если получено несколько фреймов, возвращается первый фрейм, а остальные данные буферизуются до следующего вызова read_frame.
Если вы еще этого не сделали, создайте новый файл с именем connection.rs.
touch src/connection.rs
Для реализации этого Connection нуждается в поле буфера чтения. Данные считываются из сокета в буфер чтения. Когда фрейм разбирается, соответствующие данные удаляются из буфера.
Мы будем использовать BytesMut в качестве типа буфера. Это изменяемая версия Bytes.
#![allow(unused)] fn main() { use bytes::BytesMut; use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // Выделяем буфер вместимостью 4 КБ. buffer: BytesMut::with_capacity(4096), } } } }
Далее мы реализуем метод read_frame().
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use bytes::Buf; use mini_redis::Result; struct Connection { stream: tokio::net::TcpStream, buffer: bytes::BytesMut, } struct Frame {} impl Connection { pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { // Попытка разобрать фрейм из буферизованных данных. Если // было буферизовано достаточно данных, фрейм возвращается. if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // Недостаточно буферизованных данных для чтения фрейма. // Попытка прочитать больше данных из сокета. // // При успехе возвращается количество байтов. `0` // указывает на "конец потока". if 0 == self.stream.read_buf(&mut self.buffer).await? { // Удаленная сторона закрыла соединение. Для чистого // завершения в буфере чтения не должно быть данных. // Если они есть, это означает, что узел закрыл сокет // во время отправки фрейма. if self.buffer.is_empty() { return Ok(None); } else { return Err("connection reset by peer".into()); } } } } fn parse_frame(&self) -> Result<Option<Frame>> { unimplemented!() } } }
Давайте разберем это. Метод read_frame работает в цикле. Сначала вызывается self.parse_frame(). Это попытается разобрать фрейм redis из self.buffer. Если есть достаточно данных для разбора фрейма, фрейм возвращается вызывающей стороне read_frame(). В противном случае мы пытаемся прочитать больше данных из сокета в буфер. После чтения дополнительных данных parse_frame() вызывается снова. На этот раз, если было получено достаточно данных, разбор может быть успешным.
При чтении из потока возвращаемое значение 0 указывает, что больше данных от узла получено не будет. Если в буфере чтения все еще есть данные, это указывает на то, что был получен частичный фрейм и соединение завершается внезапно. Это ошибочная ситуация, и возвращается Err.
Трейт Buf
При чтении из потока вызывается read_buf. Эта версия функции чтения принимает значение, реализующее BufMut из крейта bytes.
Сначала рассмотрим, как мы бы реализовали тот же цикл чтения с помощью read(). Вместо BytesMut можно использовать Vec<u8>.
#![allow(unused)] fn main() { use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // Выделяем буфер вместимостью 4 КБ. buffer: vec![0; 4096], cursor: 0, } } } }
И функция read_frame() на Connection:
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use tokio::io::AsyncReadExt; pub struct Connection { stream: tokio::net::TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // Убеждаемся, что буфер имеет емкость if self.buffer.len() == self.cursor { // Увеличиваем буфер self.buffer.resize(self.cursor * 2, 0); } // Читаем в буфер, отслеживая количество // прочитанных байтов let n = self.stream.read( &mut self.buffer[self.cursor..]).await?; if 0 == n { if self.cursor == 0 { return Ok(None); } else { return Err("connection reset by peer".into()); } } else { // Обновляем наш курсор self.cursor += n; } } } fn parse_frame(&mut self) -> Result<Option<Frame>> { unimplemented!() } } }
При работе с байтовыми массивами и read мы также должны поддерживать курсор, отслеживающий, сколько данных было буферизовано. Мы должны убедиться, что передаем пустую часть буфера в read(). В противном случае мы перезапишем буферизованные данные. Если наш буфер заполняется, мы должны увеличить буфер, чтобы продолжить чтение. В parse_frame() (не включено) нам нужно будет разбирать данные, содержащиеся в self.buffer[..self.cursor].
Поскольку сочетание байтового массива с курсором очень распространено, крейт bytes предоставляет абстракцию, представляющую байтовый массив и курсор. Трейт Buf реализуется типами, из которых можно читать данные. Трейт BufMut реализуется типами, в которые можно записывать данные. При передаче T: BufMut в read_buf() внутренний курсор буфера автоматически обновляется read_buf. Благодаря этому в нашей версии read_frame нам не нужно управлять собственным курсором.
Кроме того, при использовании Vec<u8> буфер должен быть инициализирован. vec![0; 4096] выделяет массив из 4096 байтов и записывает ноль в каждую запись. При изменении размера буфера новая емкость также должна быть инициализирована нулями. Процесс инициализации не бесплатен. При работе с BytesMut и BufMut емкость не инициализирована. Абстракция BytesMut предотвращает чтение неинициализированной памяти. Это позволяет нам избежать шага инициализации.
Разбор
Теперь давайте посмотрим на функцию parse_frame(). Разбор выполняется в два этапа.
- Убедиться, что буферизован полный фрейм, и найти конечный индекс фрейма.
- Разобрать фрейм.
Крейт mini-redis предоставляет нам функцию для обоих этих этапов:
Мы также будем повторно использовать абстракцию Buf для помощи. Buf передается в Frame::check. Поскольку функция check перебирает переданный буфер, внутренний курсор будет продвигаться. Когда check возвращается, внутренний курсор буфера указывает на конец фрейма.
Для типа Buf мы будем использовать std::io::Cursor<&[u8]>.
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use mini_redis::frame::Error::Incomplete; use bytes::Buf; use std::io::Cursor; pub struct Connection { stream: tokio::net::TcpStream, buffer: bytes::BytesMut, } impl Connection { fn parse_frame(&mut self) -> Result<Option<Frame>> { // Создаем тип `T: Buf`. let mut buf = Cursor::new(&self.buffer[..]); // Проверяем, доступен ли полный фрейм match Frame::check(&mut buf) { Ok(_) => { // Получаем длину фрейма в байтах let len = buf.position() as usize; // Сбрасываем внутренний курсор для // вызова `parse`. buf.set_position(0); // Разбираем фрейм let frame = Frame::parse(&mut buf)?; // Удаляем фрейм из буфера self.buffer.advance(len); // Возвращаем фрейм вызывающей стороне. Ok(Some(frame)) } // Недостаточно данных было буферизовано Err(Incomplete) => Ok(None), // Произошла ошибка Err(e) => Err(e.into()), } } } }
Полную функцию Frame::check можно найти здесь. Мы не будем рассматривать ее полностью.
Важно отметить, что используются API Buf в стиле "байтового итератора". Они извлекают данные и продвигают внутренний курсор. Например, чтобы разобрать фрейм, проверяется первый байт, чтобы определить тип фрейма. Используемая функция - Buf::get_u8. Это извлекает байт в текущей позиции курсора и продвигает курсор на единицу.
Есть еще полезные методы в трейте Buf. Проверьте документацию API для более подробной информации.
Буферизованная запись
Другая половина API фрейминга - это функция write_frame(frame). Эта функция записывает целый фрейм в сокет. Чтобы минимизировать системные вызовы write, записи будут буферизоваться. Поддерживается буфер записи, и фреймы кодируются в этот буфер перед записью в сокет. Однако, в отличие от read_frame(), весь фрейм не всегда буферизуется в байтовый массив перед записью в сокет.
Рассмотрим фрейм массовой передачи. Записываемое значение - Frame::Bulk(Bytes). Проводной формат массового фрейма - это заголовок фрейма, состоящий из символа $, за которым следует длина данных в байтах. Большая часть фрейма - это содержимое значения Bytes. Если данные большие, копирование их в промежуточный буфер будет дорогостоящим.
Для реализации буферизованной записи мы будем использовать BufWriter struct. Эта структура инициализируется с T: AsyncWrite и сама реализует AsyncWrite. Когда write вызывается на BufWriter, запись идет не напрямую к внутреннему писателю, а в буфер. Когда буфер заполняется, содержимое сбрасывается во внутренний писатель, и внутренний буфер очищается. Также есть оптимизации, позволяющие обходить буфер в определенных случаях.
Мы не будем пытаться выполнить полную реализацию write_frame() в рамках руководства. См. полную реализацию здесь.
Сначала обновляется структура Connection:
#![allow(unused)] fn main() { use tokio::io::BufWriter; use tokio::net::TcpStream; use bytes::BytesMut; pub struct Connection { stream: BufWriter<TcpStream>, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream: BufWriter::new(stream), buffer: BytesMut::with_capacity(4096), } } } }
Далее реализуется write_frame().
#![allow(unused)] fn main() { use tokio::io::{self, AsyncWriteExt}; use mini_redis::Frame; struct Connection { stream: tokio::io::BufWriter<tokio::net::TcpStream>, buffer: bytes::BytesMut, } impl Connection { async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { match frame { Frame::Simple(val) => { self.stream.write_u8(b'+').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Error(val) => { self.stream.write_u8(b'-').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Integer(val) => { self.stream.write_u8(b':').await?; self.write_decimal(*val).await?; } Frame::Null => { self.stream.write_all(b"$-1\r\n").await?; } Frame::Bulk(val) => { let len = val.len(); self.stream.write_u8(b'$').await?; self.write_decimal(len as u64).await?; self.stream.write_all(val).await?; self.stream.write_all(b"\r\n").await?; } Frame::Array(_val) => unimplemented!(), } self.stream.flush().await; Ok(()) } async fn write_decimal(&mut self, val: u64) -> io::Result<()> { unimplemented!() } } }
Используемые здесь функции предоставляются AsyncWriteExt. Они доступны и на TcpStream, но не рекомендуется выполнять однобайтовые записи без промежуточного буфера.
write_u8записывает один байт в писатель.write_allзаписывает весь срез в писатель.write_decimalреализована mini-redis.
Функция завершается вызовом self.stream.flush().await. Поскольку BufWriter хранит записи в промежуточном буфере, вызовы write не гарантируют, что данные записаны в сокет. Перед возвратом мы хотим, чтобы фрейм был записан в сокет. Вызов flush() записывает любые данные, ожидающие в буфере, в сокет.
Другая альтернатива - не вызывать flush() в write_frame(). Вместо этого предоставить функцию flush() на Connection. Это позволило бы вызывающей стороне записать несколько маленьких фреймов в буфер записи, а затем записать их все в сокет одним системным вызовом write. Это усложняет API Connection. Простота - одна из целей Mini-Redis, поэтому мы решили включить вызов flush().await в fn write_frame().