Shared state
До сих пор у нас работал ключ-значение сервер. Однако есть серьезный недостаток: состояние не является общим между соединениями. Мы исправим это в этой статье.
Стратегии
Есть несколько различных способов разделения состояния в Tokio.
- Защита общего состояния с помощью Mutex.
- Создание задачи для управления состоянием и использование передачи сообщений для работы с ним.
Обычно вы хотите использовать первый подход для простых данных и второй подход для вещей, которые требуют асинхронной работы, таких как примитивы I/O. В этой главе общее состояние - это HashMap, а операции - insert и get. Ни одна из этих операций не является асинхронной, поэтому мы будем использовать Mutex.
Второй подход рассматривается в следующей главе.
Добавление зависимости bytes
Вместо использования Vec<u8>, крейт Mini-Redis использует Bytes из крейта bytes. Цель Bytes - предоставить надежную структуру байтового массива для сетевого программирования. Самая большая особенность, которую он добавляет по сравнению с Vec<u8>, - это поверхностное клонирование. Другими словами, вызов clone() на экземпляре Bytes не копирует базовые данные. Вместо этого экземпляр Bytes является ссылочно-считаемым handle к некоторым базовым данным. Тип Bytes примерно похож на Arc<Vec<u8>>, но с некоторыми дополнительными возможностями.
Чтобы добавить зависимость bytes, добавьте следующее в ваш Cargo.toml в раздел [dependencies]:
bytes = "1"
Инициализация HashMap
HashMap будет общим для многих задач и потенциально многих потоков. Для поддержки этого он обернут в Arc<Mutex<_>>.
Сначала для удобства добавьте следующий псевдоним типа после операторов use.
#![allow(unused)] fn main() { use bytes::Bytes; use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Db = Arc<Mutex<HashMap<String, Bytes>>>; }
Затем обновите функцию main для инициализации HashMap и передачи handle Arc в функцию process. Использование Arc позволяет на HashMap ссылаться конкурентно из многих задач, потенциально выполняющихся на многих потоках. В Tokio термин handle используется для ссылки на значение, которое предоставляет доступ к некоторому общему состоянию.
use tokio::net::TcpListener; use std::collections::HashMap; use std::sync::{Arc, Mutex}; fn dox() { #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); println!("Listening"); let db = Arc::new(Mutex::new(HashMap::new())); loop { let (socket, _) = listener.accept().await.unwrap(); // Клонируем handle к хэш-карте. let db = db.clone(); println!("Accepted"); tokio::spawn(async move { process(socket, db).await; }); } } } type Db = Arc<Mutex<HashMap<(), ()>>>; async fn process(_: tokio::net::TcpStream, _: Db) {}
О использовании std::sync::Mutex и tokio::sync::Mutex
Обратите внимание, что std::sync::Mutex и не tokio::sync::Mutex используется для защиты HashMap. Распространенная ошибка - безусловно использовать tokio::sync::Mutex в асинхронном коде. Асинхронный мьютекс - это мьютекс, который блокируется при вызовах .await.
Синхронный мьютекс будет блокировать текущий поток при ожидании получения блокировки. Это, в свою очередь, заблокирует обработку других задач. Однако переключение на tokio::sync::Mutex обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс внутри.
Как правило, использование синхронного мьютекса в асинхронном коде допустимо, пока конкуренция остается низкой и блокировка не удерживается при вызовах .await.
Обновление process()
Функция process больше не инициализирует HashMap. Вместо этого она принимает общий handle к HashMap в качестве аргумента. Также необходимо заблокировать HashMap перед использованием. Помните, что тип значения для HashMap теперь Bytes (который мы можем дешево клонировать), поэтому это также нужно изменить.
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Db = Arc<Mutex<HashMap<String, bytes::Bytes>>>; async fn process(socket: TcpStream, db: Db) { use mini_redis::Command::{self, Get, Set}; // Connection, предоставляемый `mini-redis`, обрабатывает разбор фреймов из сокета let mut connection = Connection::new(socket); while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { let mut db = db.lock().unwrap(); db.insert(cmd.key().to_string(), cmd.value().clone()); Frame::Simple("OK".to_string()) } Get(cmd) => { let db = db.lock().unwrap(); if let Some(value) = db.get(cmd.key()) { Frame::Bulk(value.clone()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // Записываем ответ клиенту connection.write_frame(&response).await.unwrap(); } } }
Удержание MutexGuard через .await
Вы можете написать код, который выглядит так:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; do_something_async().await; } // lock выходит из области видимости здесь async fn do_something_async() {} }
Когда вы попытаетесь создать что-то, что вызывает эту функцию, вы столкнетесь со следующей ошибкой:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
Это происходит потому, что тип std::sync::MutexGuard не является Send. Это означает, что вы не можете отправить блокировку мьютекса в другой поток, и ошибка возникает потому, что среда выполнения Tokio может перемещать задачу между потоками при каждом .await. Чтобы избежать этого, вы должны переструктурировать ваш код так, чтобы деструктор блокировки мьютекса выполнялся до .await.
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; // Это работает! async fn increment_and_do_stuff(mutex: &Mutex<i32>) { { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; } // lock выходит из области видимости здесь do_something_async().await; } async fn do_something_async() {} }
Обратите внимание, что это не работает:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; // Это тоже не работает. async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; drop(lock); do_something_async().await; } async fn do_something_async() {} }
Это потому, что компилятор в настоящее время вычисляет, является ли future Send, на основе только информации об области видимости. Надеюсь, компилятор будет обновлен для поддержки явного удаления в будущем, но сейчас вы должны явно использовать область видимости.
Обратите внимание, что ошибка, обсуждаемая здесь, также обсуждается в разделе Send bound из главы о создании задач.
Вы не должны пытаться обойти эту проблему, создавая задачу способом, который не требует, чтобы она была Send, потому что если Tokio приостановит вашу задачу на .await, пока задача удерживает блокировку, другая задача может быть запланирована на выполнение в том же потоке, и эта другая задача также может попытаться заблокировать этот мьютекс, что приведет к взаимной блокировке, поскольку задача, ожидающая блокировки мьютекса, предотвратит освобождение мьютекса задачей, удерживающей мьютекс.
Имейте в виду, что некоторые крейты мьютексов реализуют Send для своих MutexGuards. В этом случае нет ошибки компилятора, даже если вы удерживаете MutexGuard через .await. Код компилируется, но он взаимно блокируется!
Мы обсудим некоторые подходы к избежанию этих проблем ниже:
Переструктурируйте ваш код, чтобы не удерживать блокировку через .await
Самый безопасный способ обработки мьютекса - это обернуть его в структуру и блокировать мьютекс только внутри не-асинхронных методов этой структуры.
#![allow(unused)] fn main() { use std::sync::Mutex; struct CanIncrement { mutex: Mutex<i32>, } impl CanIncrement { // Эта функция не помечена как async. fn increment(&self) { let mut lock = self.mutex.lock().unwrap(); *lock += 1; } } async fn increment_and_do_stuff(can_incr: &CanIncrement) { can_incr.increment(); do_something_async().await; } async fn do_something_async() {} }
Этот шаблон гарантирует, что вы не столкнетесь с ошибкой Send, потому что защита мьютекса не появляется нигде в асинхронной функции. Это также защищает вас от взаимных блокировок при использовании крейтов, чей MutexGuard реализует Send.
Вы можете найти более подробный пример в этом посте блога.
Создайте задачу для управления состоянием и используйте передачу сообщений для работы с ним
Это второй подход, упомянутый в начале этой главы, и он часто используется, когда общий ресурс является ресурсом I/O. Смотрите следующую главу для более подробной информации.
Используйте асинхронный мьютекс Tokio
Тип tokio::sync::Mutex, предоставляемый Tokio, также может быть использован. Основная особенность мьютекса Tokio заключается в том, что его можно удерживать через .await без каких-либо проблем. Тем не менее, асинхронный мьютекс дороже, чем обычный мьютекс, и обычно лучше использовать один из двух других подходов.
#![allow(unused)] fn main() { use tokio::sync::Mutex; //注意! Это использует мьютекс Tokio // Это компилируется! // (но переструктурирование кода было бы лучше в этом случае) async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock = mutex.lock().await; *lock += 1; do_something_async().await; } // lock выходит из области видимости здесь async fn do_something_async() {} }
Задачи, потоки и конкуренция
Использование блокирующего мьютекса для защиты коротких критических секций является приемлемой стратегией, когда конкуренция минимальна. Когда блокировка оспаривается, поток, выполняющий задачу, должен блокироваться и ждать мьютекс. Это не только заблокирует текущую задачу, но также заблокирует все другие задачи, запланированные на текущем потоке.
По умолчанию среда выполнения Tokio использует многопоточный планировщик. Задачи планируются на любом количестве потоков, управляемых средой выполнения. Если большое количество задач запланировано на выполнение и все они требуют доступа к мьютексу, то будет конкуренция. С другой стороны, если используется вариант среды выполнения current_thread, то мьютекс никогда не будет оспариваться.
информация Вариант среды выполнения
current_thread- это легковесная однопоточная среда выполнения. Это хороший выбор, когда создается только несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении синхронного API-моста поверх асинхронной клиентской библиотеки.
Если конкуренция на синхронном мьютексе становится проблемой, лучшим исправлением редко является переключение на мьютекс Tokio. Вместо этого варианты, которые следует рассмотреть, это:
- Позволить выделенной задаче управлять состоянием и использовать передачу сообщений.
- Шардировать мьютекс.
- Переструктурировать код, чтобы избежать мьютекса.
Шардирование мьютекса
В нашем случае, поскольку каждый ключ независим, шардирование мьютекса будет работать хорошо. Для этого вместо одного экземпляра Mutex<HashMap<_, _>> мы введем N различных экземпляров.
#![allow(unused)] fn main() { use std::collections::HashMap; use std::sync::{Arc, Mutex}; type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>; fn new_sharded_db(num_shards: usize) -> ShardedDb { let mut db = Vec::with_capacity(num_shards); for _ in 0..num_shards { db.push(Mutex::new(HashMap::new())); } Arc::new(db) } }
Затем поиск ячейки для любого заданного ключа становится двухэтапным процессом. Сначала ключ используется для идентификации, к какому шарду он принадлежит. Затем ключ ищется в HashMap.
#![allow(unused)] fn main() { let shard = db[hash(key) % db.len()].lock().unwrap(); shard.insert(key, value); }
Простая реализация, описанная выше, требует использования фиксированного числа шардов, и количество шардов не может быть изменено после создания шардированной карты.
Крейт dashmap предоставляет реализацию более сложной шардированной хэш-карты. Вы также можете посмотреть на такие реализации конкурентных хэш-таблиц, как leapfrog и flurry, последняя является портом структуры данных ConcurrentHashMap Java.
Прежде чем вы начнете использовать любой из этих крейтов, убедитесь, что вы структурировали свой код так, чтобы вы не могли удерживать MutexGuard через .await. Если вы этого не сделаете, у вас либо будут ошибки компилятора (в случае не-Send защит), либо ваш код взаимно заблокируется (в случае Send защит). Смотрите полный пример и больше контекста в этом посте блога.