Использование асинхронного рантайма Tokio в Rust для CPU-интенсивных задач

Jan 14th, 2022 Источник Автор: Andrew Lamb
Несмотря на термин "async" и его ассоциацию с асинхронным сетевым вводом-выводом, эта статья доказывает, что рантайм Tokio, находящийся в центре асинхронной экосистемы Rust, также является хорошим выбором для CPU-интенсивных задач, таких как те, что встречаются в аналитических движках.
Что такое Tokio?
Rust имеет встроенную поддержку асинхронной (async) модели программирования, подобно таким языкам, как JavaScript.
Чтобы в полной мере использовать преимущества многоядерности и асинхронного ввода-вывода, необходимо использовать рантайм, и хотя сообщество Rust предлагает несколько альтернатив, Tokio является де-факто стандартом. Tokio.rs описывает его как: «асинхронный рантайм для языка программирования Rust. Он предоставляет строительные блоки, необходимые для написания сетевых приложений».
Хотя это описание подчеркивает использование Tokio для сетевых коммуникаций, рантайм можно использовать и для других целей, как мы исследуем ниже.
Почему использовать Tokio для CPU-задач?
Оказывается, современным аналитическим движкам неизбежно необходимо обрабатывать клиентские запросы из сети, а также использовать сеть для взаимодействия с системами объектного хранилища, такими как AWS S3, GCP Cloud Storage и Azure Blob Storage.
Таким образом, любая такая система, реализованная на Rust, в конечном итоге будет использовать Tokio для своих сетевых и, по крайней мере, части задач ввода-вывода хранилища (да, я знаю, изначально асинхронный файловый ввод-вывод Tokio не совсем асинхронный, но это скоро исправят).
Аналитические системы также выполняют CPU-интенсивные вычисления, которые я определяю как обработку данных способом, потребляющим большое количество CPU для реорганизации хранилища, предварительного вычисления различных индексов или непосредственного ответа на клиентские запросы. Эти вычисления обычно разбиваются на множество независимых фрагментов, которые я буду называть «задачами», а затем запускаются параллельно, чтобы использовать преимущества множества ядер, доступных в современных процессорах.
Определение того, какие задачи и когда запускать, обычно выполняется так называемым «планировщиком задач» (task scheduler), который сопоставляет задачи с доступными ядрами / потоками операционной системы.
Существуют годы академических и промышленных работ по различным типам планировщиков задач, пулов рабочих, пулов потоков и тому подобного.
Мой опыт работы с несколькими пользовательскими планировщиками задач (и, к своему стыду, их реализации) показывает, что их легко заставить работать изначально (скажем, в 99,9% случаев), но затем требуется много (много!) времени, чтобы разобраться с corner cases (быстрое завершение, отмена задач, осушение и т.д.). Их также печально известно сложно тестировать из-за использования низкоуровневых примитивов потоков, и здесь изобилуют состояния гонки. Я бы не рекомендовал этого делать.
Таким образом, при поиске планировщика задач в экосистеме Rust, как мы делали для InfluxDB IOx и DataFusion, вы естественным образом приходите к Tokio, и он выглядит довольно хорошо:
- У вас уже есть Tokio (без новых зависимостей).
- Tokio реализует сложный планировщик с кражевой работой (work-stealing scheduler).
- Tokio фактически имеет встроенную языковую поддержку продолжений (async / await) и множество относительно зрелых библиотек для потоков, асинхронных блокировок, каналов, отмены и т.д.
- Tokio известен как хорошо протестированный и широко используемый в экосистеме Rust.
- Tokio обычно сохраняет выполняемые задачи и будущие результаты (futures), которые они выполняют, в одном потоке исполнителя, что отлично для локальности кэша.
- Tokio хорошо документирован, активно поддерживается и постоянно улучшается. (Консоль Tokio была анонсирована, пока я писал этот блог).
Таким образом, использовать Tokio в качестве планировщика задач для CPU-интенсивных задач — это же очевидно, верно? НЕЕЕЕЕЕЕЕЕЕЕЕЕЕТ!
Распространенные возражения против использования Tokio
Оказывается, использование Tokio было довольно горячей темой, и я бы сказал, что еще не все на 100% убеждены, отсюда и эта статья. Мы много беспокоились об этом вопросе на ранних стадиях DataFusion и InfluxDB IOx. Вот некоторые распространенные возражения:
В документации Tokio сказано не делать этого:
Более старые версии документации Tokio (например, 1.10.0) включали (на мой взгляд) известное предостережение:
«Если ваш код является CPU-интенсивным и вы хотите ограничить количество потоков, используемых для его запуска, вы должны запускать его в другом пуле потоков, таком как Rayon».
Я считаю, что эта формулировка вызвала значительную путаницу как в нашей команде, так и в широком сообществе Rust. Многие люди поняли это так, что рантайм Tokio никогда не следует использовать для CPU-интенсивных задач. Ключевой момент на самом деле заключается в том, что один и тот же экземпляр Runtime (один и тот же пул потоков) не должен использоваться как для I/O, так и для CPU, и впоследствии мы уточнили intent документации (подробности в PR).
Кстати, в документации Tokio предлагается использовать Rayon для CPU-интенсивных задач. Rayon — отличный выбор для многих приложений, но он не поддерживает async, поэтому, если ваш код должен выполнять любой ввод-вывод, вам придется переступать через болезненную границу sync/async. Мне также было сложно сопоставить pull-based модель выполнения, где задача должна ждать, пока все входные данные будут готовы, прежде чем она сможет запуститься, с Rayon.
Хвостовые задержки будут преследовать вас
Мудрые люди говорят: «Использование Tokio для CPU-интенсивной работы увеличит хвостовые задержки ваших запросов, что неприемлемо». Но подождите! Вы можете сказать: «Хвостовые задержки? 🙄 Я пишу базу данных, и это звучит как академическая проблема для веб-серверов, находящихся под высокой нагрузкой…»
Не совсем: Рассмотрим проверку работоспособности (liveness check), де-факто обязательную в наши дни для систем, развернутых с помощью оркестровки контейнеров (кхм Kubernetes). Проверка того, что ваш процесс ведет себя хорошо, часто представляет собой HTTP-запрос к чему-то вроде /health. Если этот запрос sits в очереди задач где-то, потому что Tokio полностью использует ваш CPU для эффективного пережевывания массы задач обработки данных, Kubernetes не получает требуемого ответа «Все в порядке» и убивает ваш процесс.
Эта цепочка рассуждений приводит к классическому выводу, что, поскольку хвостовые задержки критичны, вы не можете использовать Tokio для CPU-интенсивных задач.
Однако, как советует документация Tokio, чтобы избежать уничтожения Kubernetes и друзьями при полной загрузке CPU, важно использовать отдельный пул потоков — один для задач, где «задержка важна», таких как ответ на /health, и один для CPU-интенсивных задач. Оптимальное количество потоков для этих пулов потоков зависит от ваших потребностей и является хорошей темой для отдельной статьи.
Возможно, если думать о Tokio Runtime как о сложном пуле потоков, идея использования разных экземпляров Runtime может показаться более приемлемой, и мы покажем, как это сделать, с помощью выделенного исполнителя ниже.
Высокие накладные расходы на задачу
Но «подождите!» — слышу я вас (или все слышат вас на Hacker News), — у Tokio высокие накладные расходы на задачу. Я совсем не удивлен, что люди могут создавать пулы потоков, которые прогоняют крошечные задачи быстрее, чем Tokio.
Однако я еще не видел такой системы, которой я бы доверял для своих рабочих нагрузок в production, ни одной, которая имела бы такую же надежную поддержку экосистемы.
К счастью, для многих рабочих нагрузок накладные расходы на задачу могут быть амортизированы с помощью «векторизованной обработки». Это модный способ сказать, что каждая задача обрабатывает тысячи строк за раз, а не одну. Конечно, нельзя сходить с ума; вам нужно разбивать работу на фрагменты разумного размера, и нельзя амортизировать все рабочие нагрузки. Однако для всех случаев, которые важны для моих приложений, накладные расходы на задачу Tokio теряются в шуме.
Как использовать Tokio для CPU-интенсивных задач?
Итак, давайте представим, что я убедил вас, что использовать Tokio для CPU-интенсивной работы — это нормально. Как это сделать?
Во-первых, что критически важно, ваш код должен следовать поговорке: «асинхронный код никогда не должен проводить много времени, не достигая .await», как объяснено в посте Alice Ryhl. Это нужно, чтобы дать планировщику шанс запланировать что-то еще, украсть работу и т.д.
Конечно, «много времени» зависит от вашего приложения; Ryhl рекомендует 10–100 микросекунд при оптимизации для хвостовых задержек ответа. Я думаю, что 10–100 миллисекунд также нормально для задач при оптимизации для CPU. Однако, поскольку мои расчетные накладные расходы Tokio на задачу составляют около ~10 наносекунд, почти невозможно даже измерить накладные расходы Tokio Runtime для задач в 10 миллисекунд.
Во-вторых, запускайте свои задачи в отдельном экземпляре Runtime. Как это сделать? Рад, что вы спросили.
Выделенный исполнитель (Dedicated Executor)
Вот упрощенная версия того, как мы запускаем задачи в отдельном Tokio Runtime в InfluxDB IOx. (Полную версию можно найти в нашем репозитории, и в ней есть дополнительная логика для чистого завершения и соединения.)
#![allow(unused)] fn main() { pub struct DedicatedExecutor { state: Arc<Mutex<State>>, } /// Запускает future (и любые `tasks`, которые `tokio::task::spawned` ими) /// на отдельном исполнителе Tokio struct State { /// Канал для запросов -- выделенный исполнитель принимает запросы /// отсюда и запускает их. requests: Option<std::sync::mpsc::Sender<Task>>, /// Поток, в котором установлен другой Tokio runtime /// и порождает задачи там thread: Option<std::thread::JoinHandle<()>>, } impl DedicatedExecutor { /// Создает новый `DedicatedExecutor` с выделенным Tokio /// исполнителем, который отделен от пула потоков, созданного через /// `[tokio::main]`. pub fn new(thread_name: &str, num_threads: usize) -> Self { let thread_name = thread_name.to_string(); let (tx, rx) = std::sync::mpsc::channel::<Task>(); let thread = std::thread::spawn(move || { // Создать новый Runtime для запуска задач let runtime = Tokio::runtime::Builder::new_multi_thread() .enable_all() .thread_name(&thread_name) .worker_threads(num_threads) // Понизить приоритет рабочих потоков ОС для приоритизации основного рантайма .on_thread_start(move || set_current_thread_priority_low()) .build() .expect("Creating Tokio runtime"); // Брать запросы задач из канала и отправлять их исполнителю runtime.block_on(async move { while let Ok(task) = rx.recv() { Tokio::task::spawn(async move { task.run().await; }); } }); }); let state = State { requests: Some(tx), thread: Some(thread), }; Self { state: Arc::new(Mutex::new(state)), } } // ... } }
Этот код создает новый std::thread, который создает отдельный многопоточный Tokio Runtime для запуска задач, а затем читает задачи из канала и порождает их в новом Runtime.
Примечание: Новый поток — это ключ. Если вы попытаетесь создать новый Runtime в основном потоке или в одном из потоков, созданных Tokio, вы получите ошибку, так как Runtime уже установлен.
Вот соответствующий код для отправки задачи во второй Runtime.
#![allow(unused)] fn main() { impl DedicatedExecutor { /// Запускает указанный Future (и любые задачи, которые он порождает) на /// `DedicatedExecutor`. pub fn spawn<T>(&self, task: T) -> Job<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static, { let (tx, rx) = tokio::sync::oneshot::channel(); let fut = Box::pin(async move { let task_output = task.await; tx.send(task_output).ok() }); let mut state = self.state.lock(); let task = Task { fut, }; if let Some(requests) = &mut state.requests { // завершится ошибкой, если кто-то начал завершение работы requests.send(task).ok(); } else { warn!("tried to schedule task on an executor that was shutdown"); } Job { rx } } } }
Job
Код выше использует обертку вокруг Future под названием Job, которая обрабатывает передачу результатов из выделенного исполнителя обратно в основной исполнитель, что выглядит так:
#![allow(unused)] fn main() { #[pin_project(PinnedDrop)] pub struct Job<T> { #[pin] rx: Receiver<T>, } impl<T> Future for Job<T> { type Output = Result<T, Error>; fn poll( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { let this = self.project(); this.rx.poll(cx) } } }
И это все! Вы можете найти весь код в этом Github gist.