Spawning
Мы переключимся и начнем работать над сервером Redis.
Сначала переместим клиентский код SET/GET из предыдущего раздела в примерный файл. Таким образом, мы сможем запускать его против нашего сервера.
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
Затем создайте новый, пустой src/main.rs и продолжайте.
Принятие сокетов
Первое, что должен сделать наш сервер Redis — это принимать входящие TCP-сокеты. Это делается путем привязки tokio::net::TcpListener к порту 6379.
информация Многие типы Tokio называются так же, как их синхронные эквиваленты в стандартной библиотеке Rust. Когда это имеет смысл, Tokio предоставляет те же API, что и
std, но используяasync fn.
Затем сокеты принимаются в цикле. Каждый сокет обрабатывается, затем закрывается. Пока мы будем читать команду, выводить ее в stdout и отвечать ошибкой.
src/main.rs
use tokio::net::{TcpListener, TcpStream}; use mini_redis::{Connection, Frame}; fn dox() { #[tokio::main] async fn main() { // Привязываем слушателя к адресу let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { // Второй элемент содержит IP и порт нового соединения. let (socket, _) = listener.accept().await.unwrap(); process(socket).await; } } } async fn process(socket: TcpStream) { // `Connection` позволяет нам читать/писать **фреймы** redis вместо потоков байтов. // Тип `Connection` определен в mini-redis. let mut connection = Connection::new(socket); if let Some(frame) = connection.read_frame().await.unwrap() { println!("GOT: {:?}", frame); // Отвечаем ошибкой let response = Frame::Error("unimplemented".to_string()); connection.write_frame(&response).await.unwrap(); } }
Теперь запустите этот цикл принятия:
$ cargo run
В отдельном окне терминала запустите пример hello-redis (команда SET/GET из предыдущего раздела, которая играет роль клиента Redis):
$ cargo run --example hello-redis
Вывод должен быть:
Error: "unimplemented"
В терминале сервера вывод:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
Конкурентность
У нашего сервера есть небольшая проблема (кроме того, что он отвечает только ошибками). Он обрабатывает входящие запросы по одному. Когда соединение принято, сервер остается внутри блокировки цикла принятия до тех пор, пока ответ полностью не записан в сокет.
Мы хотим, чтобы наш сервер Redis обрабатывал много конкурентных запросов. Для этого нам нужно добавить немного конкурентности.
информация Конкурентность и параллелизм — не одно и то же. Если вы переключаетесь между двумя задачами, то вы работаете над обеими задачами конкурентно, но не параллельно. Чтобы это считалось параллельным, вам нужно два человека, каждый из которых посвящен своей задаче.
Одно из преимуществ использования Tokio заключается в том, что асинхронный код позволяет вам работать над многими задачами конкурентно, без необходимости работать над ними параллельно с использованием обычных потоков. Фактически, Tokio может запускать многие задачи конкурентно в одном потоке!
Чтобы обрабатывать соединения конкурентно, для каждого входящего соединения создается новая задача. Соединение обрабатывается в этой задаче.
Цикл принятия становится:
use tokio::net::TcpListener; fn dox() { #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { let (socket, _) = listener.accept().await.unwrap(); // Для каждого входящего сокета создается новая задача. Сокет перемещается в новую задачу и обрабатывается там. tokio::spawn(async move { process(socket).await; }); } } } async fn process(_: tokio::net::TcpStream) {}
Задачи
Задача Tokio — это асинхронный зеленый поток. Они создаются путем передачи async блока в tokio::spawn. Функция tokio::spawn возвращает JoinHandle, который вызывающая сторона может использовать для взаимодействия с созданной задачей. async блок может иметь возвращаемое значение. Вызывающая сторона может получить возвращаемое значение с помощью .await на JoinHandle.
Например:
#[tokio::main] async fn main() { let handle = tokio::spawn(async { // Выполняем некоторую асинхронную работу "return value" }); // Выполняем другую работу let out = handle.await.unwrap(); println!("GOT {}", out); }
Ожидание JoinHandle возвращает Result. Когда задача встречает ошибку во время выполнения, JoinHandle вернет Err. Это происходит, когда задача либо паникует, либо когда задача принудительно отменяется из-за остановки среды выполнения.
Задачи — это единица выполнения, управляемая планировщиком. Создание задачи отправляет ее в планировщик Tokio, который затем обеспечивает выполнение задачи, когда у нее есть работа. Созданная задача может выполняться в том же потоке, где она была создана, или может выполняться в другом потоке среды выполнения. Задача также может быть перемещена между потоками после создания.
Задачи в Tokio очень легковесны. Под капотом они требуют только одно выделение памяти и 64 байта памяти. Приложения должны свободно создавать тысячи, если не миллионы задач.
Ограничение 'static
Когда вы создаете задачу в среде выполнения Tokio, время жизни ее типа должно быть 'static. Это означает, что созданная задача не должна содержать ссылок на данные, владеемые вне задачи.
информация Распространенное заблуждение, что
'staticвсегда означает "живет вечно", но это не так. Только потому, что значение'static, не означает, что у вас утечка памяти. Вы можете прочитать больше в Common Rust Lifetime Misconceptions.
Например, следующий код не скомпилируется:
use tokio::task; #[tokio::main] async fn main() { let v = vec![1, 2, 3]; task::spawn(async { println!("Here's a vec: {:?}", v); }); }
Попытка компиляции приводит к следующей ошибке:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
Это происходит потому, что по умолчанию переменные не перемещаются в async блоки. Вектор v остается во владении функции main. Строка println! заимствует v. Компилятор Rust любезно объясняет это нам и даже предлагает исправление! Изменение строки 7 на task::spawn(async move { предпишет компилятору переместить v в созданную задачу. Теперь задача владеет всеми своими данными, делая их 'static.
Если к одним данным должен быть доступ из более чем одной задачи конкурентно, то они должны быть общими с использованием примитивов синхронизации, таких как Arc.
Обратите внимание, что сообщение об ошибке говорит о том, что тип аргумента переживает время жизни 'static. Эта терминология может быть довольно запутанной, потому что время жизни 'static длится до конца программы, так что если он переживает его, разве это не утечка памяти? Объяснение в том, что это тип, а не значение должен переживать время жизни 'static, и значение может быть уничтожено до того, как его тип станет недействительным.
Когда мы говорим, что значение 'static, все это означает, что не будет ошибкой хранить это значение вечно. Это важно, потому что компилятор не может рассуждать о том, как долго новая задача остается активной. Мы должны убедиться, что задаче разрешено жить вечно, чтобы Tokio мог заставить задачу работать так долго, как это необходимо.
Статья, на которую ссылается предыдущий информационный блок, использует терминологию "ограничено 'static" вместо "его тип переживает 'static" или "значение 'static" для ссылки на T: 'static. Все это означает одно и то же, но отличается от "аннотировано с 'static" как в &'static T.
Ограничение Send
Задачи, созданные с помощью tokio::spawn, должны реализовывать Send. Это позволяет среде выполнения Tokio перемещать задачи между потоками, пока они приостановлены на .await.
Задачи являются Send, когда все данные, которые хранятся между вызовами .await, являются Send. Это немного тонко. Когда вызывается .await, задача возвращается к планировщику. В следующий раз, когда задача выполняется, она возобновляется с точки, где она последний раз уступила. Чтобы это работало, все состояние, которое используется после .await, должно быть сохранено задачей. Если это состояние Send, т.е. может быть перемещено между потоками, то сама задача может быть перемещена между потоками. И наоборот, если состояние не Send, то и задача тоже.
Например, это работает:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { // Область видимости заставляет `rc` удалиться до `.await`. { let rc = Rc::new("hello"); println!("{}", rc); } // `rc` больше не используется. Он **не** сохраняется, когда задача уступает планировщику yield_now().await; }); }
А это нет:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { let rc = Rc::new("hello"); // `rc` используется после `.await`. Он должен быть сохранен в состоянии задачи. yield_now().await; println!("{}", rc); }); }
Попытка компиляции фрагмента приводит к:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
Мы обсудим особый случай этой ошибки более подробно в следующей главе.
Хранение значений
Теперь мы реализуем функцию process для обработки входящих команд. Мы будем использовать HashMap для хранения значений. Команды SET будут вставлять в HashMap, а команды GET будут загружать их. Дополнительно мы будем использовать цикл для принятия более одной команды на соединение.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; async fn process(socket: TcpStream) { use mini_redis::Command::{self, Get, Set}; use std::collections::HashMap; // Для хранения данных используется хэш-карта let mut db = HashMap::new(); // Connection, предоставляемый `mini-redis`, обрабатывает разбор фреймов из сокета let mut connection = Connection::new(socket); // Используем `read_frame` для получения команды из соединения. while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { // Значение хранится как `Vec<u8>` db.insert(cmd.key().to_string(), cmd.value().to_vec()); Frame::Simple("OK".to_string()) } Get(cmd) => { if let Some(value) = db.get(cmd.key()) { // `Frame::Bulk` ожидает данные типа `Bytes`. Этот тип будет рассмотрен позже в руководстве. Пока `&Vec<u8>` преобразуется в `Bytes` с помощью `into()`. Frame::Bulk(value.clone().into()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // Записываем ответ клиенту connection.write_frame(&response).await.unwrap(); } } }
Теперь запустите сервер:
$ cargo run
и в отдельном окне терминала снова запустите пример клиента hello-redis:
$ cargo run --example hello-redis
Теперь вывод клиента будет:
got value from the server; result=Some(b"world")
Теперь мы можем получать и устанавливать значения, но есть проблема: значения не являются общими между соединениями. Если другой сокет подключится и попытается GET ключ hello, он ничего не найдет.
Полный код можно найти здесь.
В следующем разделе мы реализуем сохранение данных для всех сокетов.