Древовидная структурированная конкурентность
2023-07-01 Yoshua Wuyts Источник
- Что такое структурированная конкурентность?
- Неструктурированная конкурентность: пример
- Структурированная конкурентность: пример
- Что может случиться в худшем случае?
- Применение структурированной конкурентности в ваших программах
- Паттерн: управляемые фоновые задачи
- Гарантирование структуры
- Заключение
Довольно долго я пытался найти хороший способ объяснить, что такое структурированная конкурентность и как она применяется в Rust. Я придумывал эффектные фразы вроде: «Структурированная конкурентность — это структурированное программирование, примененное к примитивам управления параллельным выполнением». Но это требует от меня начать объяснять, что такое структурное программирование, и внезапно я оказываюсь погруженным в концепцию на 2000 слов, которая кажется естественной большинству людей, пишущих программы сегодня1.
Вместо этого я хочу попробовать нечто иное. В этом посте я хочу дать вам практическое введение в структурированную конкурентность. Я сделаю все возможное, чтобы объяснить, что это такое, почему это важно и как вы можете начать применять это в своих проектах на Rust уже сегодня. Структурированная конкурентность — это призма, которую я использую почти во всех своих рассуждениях об асинхронном Rust, и я думаю, что она может помочь и другим. Так что давайте погрузимся.
Этот пост предполагает некоторое знакомство с асинхронным Rust и отменой асинхронных операций. Если вы еще не знакомы, возможно, будет полезно бегло просмотреть предыдущие посты на эту тему.
Что такое структурированная конкурентность?
Структурированная конкурентность — это свойство вашей программы. Это не просто любая структура; структура программы гарантированно представляет собой дерево, независимо от того, сколько конкурентности происходит внутри2. Хороший способ думать об этом: если бы вы могли построить живой граф вызовов вашей программы в виде ряда отношений, он бы аккуратно образовывал дерево. Без циклов3. Без висячих узлов. Просто одно дерево.
Рис. 1. Стрелки указывают от родительских узлов к дочерним. Нет циклов. Родитель может иметь несколько детей. Но у ребенка всегда один родитель — кроме корневого узла.
И эта структура, по крайней мере в асинхронном Rust, обеспечивает три ключевых свойства:
- Распространение отмены: Когда вы отбрасываете (drop) future для его отмены, гарантируется, что все future под ним также отменяются.
- Распространение ошибок: Когда ошибка возникает где-то внизу графа вызовов, ее всегда можно передать вверх вызывающим сторонам, пока не найдется вызывающий, готовый обработать ее.
- Порядок операций: Когда функция возвращает результат, вы знаете, что она закончила работу. Никаких сюрпризов, что что-то все еще происходит спустя долгое время после того, как вы подумали, что функция завершилась.
Эти свойства вместе приводят к так называемой «модели выполнения черного ящика»: при структурированной модели вычислений вам не нужно знать ничего о внутреннем устройстве функций, которые вы вызываете, потому что их поведение гарантировано. Функция вернет результат, когда закончит работу, отменит всю работу, когда вы попросите, и вы всегда получите ошибку, если есть что-то, что нужно обработать. И как следствие, код в этой модели является композируемым.
Рис. 2. При структурированной конкурентности у каждого future есть родитель, отмена распространяется вниз, а ошибки — вверх. Когда future возвращает результат, вы можете быть уверены, что он закончил работу.
Если ваша модель конкурентности неструктурированна, то у вас нет этих гарантий. Поэтому, чтобы гарантировать, скажем, правильное распространение отмены, вам нужно будет проверять внутреннее устройство каждой функции, которую вы вызываете. Код в этой модели не является композируемым и требует ручных проверок и специальных решений. Это трудоемко и подвержено ошибкам.
Неструктурированная конкурентность: пример
Давайте начнем с реализации классического паттерна конкурентности: «гонка» (race). Но вместо использования структурированных примитивов мы можем использовать основы неструктурированного программирования: почтенные task::spawn и канал (channel). «Гонка» работает так: она принимает два future, и мы пытаемся получить вывод того, который завершится первым. Мы могли бы написать это примерно так:
#![allow(unused)] fn main() { use async_std::{channel, task}; let (sender0, receiver) = channel::bounded(1); let sender1 = sender0.clone(); task::spawn(async move { // 👈 Задача "C" task::sleep(Duration::from_millis(100)); sender1.send("first").await; }); task::spawn(async move { // 👈 Задача "B" task::sleep(Duration::from_millis(100)); sender0.send("second").await; }); let msg = receiver.recv().await; // 👈 Future "A" println!("{msg}"); }
Хотя это правильно реализует семантику «гонки», это не обрабатывает отмену. Если одна из ветвей завершится, мы бы хотели отменить другую. И если содержащая функция отменена, обе вычисления должны быть отменены. Из-за того, как мы структурировали программу, ни одна задача не привязана к родительскому future, поэтому мы не можем отменить ни одно вычисление напрямую. Вместо этого решением было бы придумать какой-то дизайн, используя больше каналов, привязать handle'ы — или мы могли бы переписать это, используя структурированные примитивы.
Рис. 3. Вы можете создать операцию «гонки», комбинируя задачи и каналы. Данные могут выходить из задач к вызывающей стороне. Но поскольку задачи не привязаны к родительской задаче, отмена не распространяется.
Структурированная конкурентность: пример
Мы можем переписать пример выше, используя структурированные примитивы. Вместо того чтобы самостоятельно реализовывать «гонку» с помощью задач и каналов, мы должны использовать примитив «гонки», который реализует эту семантику за нас — и правильно обрабатывает отмену. Используя библиотеку futures-concurrency, мы можем сделать это следующим образом:
#![allow(unused)] fn main() { use futures_concurrency::prelude::*; use async_std::task; let c = async { // 👈 Future "C" task::sleep(Duration::from_millis(100)); "first" }; let b = async { // 👈 Future "B" task::sleep(Duration::from_millis(100)); "second" }; let msg = (c, b).race().await; // 👈 Future "A" println!("{msg}"); }
Когда один future завершается здесь, другой future отменяется. И если future Race будет отброшен, то оба future отменяются. Оба future имеют родительский future при выполнении. Отмена распространяется вниз. И хотя в этом примере нет ошибок, если бы мы работали с операциями, которые могут завершиться ошибкой, то ранние возвраты привели бы к раннему завершению future — и ошибки обрабатывались бы как ожидалось.
Рис. 4. Используя структурированный примитив «гонки», все дочерние future привязаны к родительскому future. Что позволяет распространять как отмену, так и ошибки. И операция не вернется, пока все дочерние future не будут отброшены.
До сих пор мы рассматривали только операцию «гонки», которая кодирует: «Ждать завершения первого future, затем отменить другой». Но существуют и другие асинхронные операции конкурентности, такие как:
- join: ждать завершения всех future.
- race_ok: ждать завершения первого future, который вернет
Ok. - try_join: ждать завершения всех future или вернуться раньше, если есть ошибка.
- merge: ждать завершения всех future и выдавать элементы из потока (stream), как только они будут готовы.
Есть еще несколько, таких как «zip», «unzip» и «chain», а также динамические примитивы конкурентности, такие как «группа задач» (task group), «группа задач с ошибками» (fallible task group) и другие. Суть в том, что набор примитивов конкурентности ограничен. Но их можно перекомбинировать таким образом, чтобы выразить любую форму конкурентности, какую вы захотите. Не unlike тому, как если язык программирования поддерживает ветвление, циклы и вызовы функций, вы можете закодировать почти любую логику управления потоком, — без необходимости использовать «goto».
Что может случиться в худшем случае?
Люди иногда спрашивают: Что самое худшее может случиться, когда у вас нет структурированной конкурентности? Возможен ряд плохих исходов, включая, но не ограничиваясь: потерю данных, повреждение данных и утечки памяти.
В то время как Rust защищает от состояний гонки данных (data races), которые подпадают под категорию «безопасность памяти» (memory safety), Rust не может защитить вас от логических ошибок (logic bugs). Например: если вы выполняете операцию записи внутри задачи, handle которой не присоединен (joined), то вам нужно найти какой-то альтернативный механизм, чтобы гарантировать порядок этой операции по отношению к остальной части программы. Если вы ошиблись, вы можете случайно записать в закрытый ресурс и потерять данные. Или выполнить запись не в том порядке и случайно повредить ресурс4. Эти виды ошибок не относятся к тому же классу, что и ошибки безопасности памяти. Но, тем не менее, они серьезны, и их можно смягчить с помощью принципиального проектирования API.
Применение структурированной конкурентности в ваших программах
task::spawn
При использовании или создании асинхронных API в Rust вы должны задать себе следующие вопросы, чтобы обеспечить структурированную конкурентность:
- Распространение отмены: Если этот future или функция будут отброшены, будет ли отмена распространена на все дочерние future?
- Распространение ошибок: Если ошибка произойдет где-либо в этом future, можем ли мы либо обработать ее напрямую, либо передать ее вызывающей стороне?
- Порядок операций: Когда эта функция вернет результат, не будет ли больше работы, продолжающейся в фоне?
Если все эти свойства истинны, то после выхода из функция завершила выполнение, и все в порядке. Однако это подводит нас к серьезной проблеме в современной асинхронной экосистеме: ни async-std, ни tokio не предоставляют функцию spawn, которая была бы структурированной. Если вы отбросите handle задачи, задача не отменяется, а вместо этого отсоединяется (detached) и продолжает работать в фоне. Это означает, что отмена не распространяется автоматически через границы задач, что делает ее неструктурированной.
Библиотека smol приближается к этому. В ней есть реализация задачи, которая приближает нас к семантике «отмена при отбрасывании» (cancel on drop) из коробки. Хотя она еще не доводит нас до конца, потому что не гарантирует порядок операций. Когда задача smol Task отбрасывается, не гарантируется, что задача была отменена; гарантируется только, что задача будет отменена в какой-то момент в будущем.
async drop
Что подводит нас к самому большому недостающему элементу в истории структурированной конкурентности асинхронного Rust: отсутствию async Drop в языке. Задачи smol имеют асинхронный метод cancel, который завершается только тогда, когда задача была успешно отменена. В идеале мы могли бы вызывать этот метод в деструкторе и ждать его. Но чтобы сделать это сегодня, нам нужно было бы блокировать поток, а это может привести к проблемам с пропускной способностью. Нет, на практике для хорошей работы нам действительно нужны асинхронные деструкторы5.
Что можно сделать сегодня?
Но хотя мы еще не можем тривиально выполнить все требования для асинхронной структурированной конкурентности для асинхронных задач, не все потеряно. Без async Drop мы уже можем достичь 2/3 требований для порождения задач сегодня. И если вы используете среду выполнения (runtime), отличную от smol, адаптировать функцию spawn для работы как в smol — не слишком сложная работа. Но большей части конкурентности не нужны задачи, потому что она не является динамической. Для этого вы можете взглянуть на библиотеку futures-concurrency, которая реализует композируемые примитивы для структурированной конкурентности.
Если вы хотите внедрить структурированную конкурентность в своей кодовой базе сегодня, вы можете начать с ее принятия для не основанной на задачах конкурентности. А для конкурентности на основе задач вы можете принять модель порождения задач smol, чтобы получить большую часть преимуществ структурированной конкурентности уже сегодня. И в конечном итоге, мы надеемся, мы сможем добавить какую-либо форму async Drop в язык, чтобы закрыть оставшиеся пробелы.
Паттерн: управляемые фоновые задачи
Люди часто спрашивают, как они могут реализовать «фоновые задачи» при структурированной конкурентности. Это используется в сценариях, таких как обработчик HTTP-запросов, который также хочет отправить часть телеметрии. Вместо того чтобы блокировать отправку ответа на телеметрии, он порождает «фоновую задачу» для отправки телеметрии в фоне и немедленно возвращает результат из запроса. Это может выглядеть примерно так:
#![allow(unused)] fn main() { let mut app = tide::new(); app.at("/").post(|_| async move { task::spawn(async { // 👈 Порождение фоновой задачи… let _res = send_telemetry(data, more_data).await; // … что, если `res` — это `Err`? Как нам следует обрабатывать ошибки здесь? }); Ok("hello world") // 👈 …и немедленный возврат после. }); app.listen("127.0.0.1:8080").await?; }
Фраза «фоновая задача» кажется вежливой и ненавязчивой. Но с структурированной точки зрения она представляет собой вычисление без родителя — это висячая задача (dangling task). Суть паттерна, с которым мы имеем дело, заключается в том, что мы хотим создать вычисление, которое переживает время жизни обработчика запроса. Мы можем решить это, вместо создания висячей задачи, отправив ее в очередь задач или группу задач, которая переживает обработчик запроса. В отличие от висячей задачи, очередь задач или группа задач сохраняет структурированную конкурентность. Там, где висячая задача не имеет родительского future и становится недостижимой, используя очередь задач, мы передаем владение future другому объекту, который переживает текущую, более кратковременную область видимости.
Я слышал, как люди ранее утверждали, что task::spawn совершенно структурирован, если думать о нем как о порождении в какой-то недостижимый, глобальный пул задач. Но вопрос должен быть не в том, порождаются ли задачи в пуле задач, а в том, каково отношение этих задач к остальной части программы. Потому что мы не можем отменить и воссоздать недостижимый пул задач. Мы также не можем получать ошибки из этого пула или ждать завершения всех задач в нем. Это не обеспечивает свойства, которые мы хотим от структурированной конкурентности, — поэтому мы не должны считать это структурированным.
Я не чувствую, что в экосистеме есть какие-то великолепные решения для этого yet — отчасти ограниченные тем, что нам нужны «задачи с областью видимости» (scoped tasks), которые basically требуют линейных деструкторов для функционирования. Но существуют другие эксперименты, поэтому мы можем использовать их плюс каналы, чтобы собрать что-то, что даст нам то, что мы хотим:
⚠️ Примечание: Этот код не считается «хорошим» автором и используется лишь как пример, чтобы показать, что это возможно написать сегодня. Необходима дополнительная работа по проектированию, чтобы сделать это эргономичным. ⚠️
#![allow(unused)] fn main() { // Создаем канал для отправки и получения future. let (sender, receiver) = async_channel::unbounded(); // Создаем структурированную группу задач на верхнем уровне, рядом с HTTP-сервером // // Если любая из порожденных задач вернет ошибку, все активные задачи отменяются, // и ошибка возвращается через handle. let telemetry_handle = async_task_group::group(|group| async move { while let Some(telemetry_future) = receiver.next().await { group.spawn(async move { telemetry_future.await?; // 👈 Передаем ошибки вверх Ok(()) }); } Ok(group) }); // Создаем состояние приложения для нашего HTTP-сервера #[derive(Clone)] struct State { sender: async_channel::Sender<impl Future<Result<_>>>, } // Создаем HTTP-сервер let mut app = tide::new(); app.at("/").post(|req: Request<State>| async move { state.sender.send(async { // 👈 Отправляет future в цикл обработчика… send_telemetry(data, more_data).await?; Ok(()) }).await; Ok("hello world") // 👈 …и немедленно возвращается после. }); // Одновременно выполняем и HTTP-сервер, и обработчик телеметрии, // и если один из них перестает работать, другой тоже останавливается. (app.listen("127.0.0.1:8080"), telemetry_handle).race().await?; }
Как я сказал: нам нужно сделать гораздо больше работы над API, чтобы сравниться с удобством простого запуска висячей задачи. Но то, чего нам не хватает в удобстве API, мы компенсируем семантикой. В отличие от нашего предыдущего примера, это будет правильно распространять отмену и ошибки, и каждым выполняющимся future владеет родительский future. Мы могли бы даже пойти дальше и реализовать такие вещи, как обработчики повторов (retry-handlers) с квотами ошибок поверх этого, чтобы создать более устойчивую систему. Но, надеюсь, этого уже достаточно, чтобы передать идею того, что мы могли бы делать с этим.
Гарантирование структуры
Я уже некоторое время задаю себе вопрос: «Возможно ли для Rust обеспечить соблюдение структурированной конкурентности на уровне языка и библиотек?» Я не верю, что это то, что мы можем гарантировать со стороны языка. Но это то, что мы можем гарантировать для библиотечного кода Rust и сделать так, чтобы большая часть асинхронного кода по умолчанию была структурированной.
Причина, по которой я не верю, что принципиально возможно гарантировать структуру на уровне языка, заключается в том, что в Rust можно выразить любую программу, включая неструктурированные программы. Future, каналы и задачи, существующие сегодня, — это все просто обычные библиотечные типы. Если бы мы захотели обеспечить структуру со стороны языка, нам нужно было бы найти способ запретить создание этих библиотек — и это кажется невозможным для языка общего назначения6.
Вместо этого мне кажется более практичным принять древовидную структурированную конкурентность как модель, которой мы следуем для асинхронного Rust. Не как гарантию безопасности памяти, а как дисциплину проектирования, которую мы применяем ко всему асинхронному Rust. API, которые неструктурированны, не должны добавляться в stdlib. И наши инструменты должны знать, что неструктурированный код может существовать, чтобы они могли помечать его, когда встречают.
Заключение
В этом посте я показал, что такое (древовидная) структурированная конкурентность, почему она важна для корректности и как вы можете применять ее в своих программах. Я надеюсь, что, определив структурированную конкурентность в терминах гарантий о распространении ошибок и отмены, мы сможем создать практическую модель для людей, чтобы рассуждать об асинхронном Rust.
Как недавно сообщила Google, асинхронный Rust — один из самых сложных аспектов Rust для изучения. Вероятно, отсутствие структуры в асинхронном коде Rust сегодня не помогло. В асинхронном коде сегодня ни отмена, ни ошибки не гарантированно распространяются. Это означает, что если вы хотите надежно комбинировать код, вам нужно знать о внутреннем устройстве кода, который вы используете. Приняв (древовидную) структурированную модель конкурентности, эти свойства могут быть гарантированы с самого начала, что, в свою очередь, сделает асинхронный Rust более легким для понимания и обучения. Потому что «Если это компилируется, это работает» должно относиться и к асинхронному Rust.
Спасибо Ирине Шестак за иллюстрации и вычитку этого поста.
Примечания
-
Если вам интересно структурированное программирование, я могу рекомендовать писания Дейкстры на эту тему. Большинство языков программирования, которые мы используем сегодня, структурированы, поэтому читать о времени, когда это было не так, действительно интересно. ↩
-
Когда я исследовал эту тему в прошлом году, я нашел статью, кажется, из 80-х, в которой использовалась фраза «tree-structured concurrency». Я не смог найти ее снова к моменту написания этого поста, но я помню, что твитил о ней, потому что раньше не видел этот термин и думал, что он очень полезен! ↩
-
Да, да, рекурсия может заставлять функции вызывать самих себя — или даже вызывать самих себя через прокси. Под «живым графом вызовов» я подразумеваю его так, как flame charts визуализируют вызовы функций. Рекурсия визуализируется путем наложения вызовов функций друг на друга. Та же идея применима здесь путем добавления новых асинхронных узлов как детей существующих узлов. Акцент здесь сделан именно на живом графе вызовов, а не на логическом. ↩
-
На предыдущей работе мы столкнулись именно с этим в клиенте базы данных: у нас были проблемы с правильным распространением отмены, что означало, что протокол соединения мог быть поврежден, потому что мы не сбрасывали (flush) сообщения, когда должны были. ↩
-
Асинхронная отмена — едва ли единственная мотивация для
async Drop. Это также мешает нам кодировать базовые вещи, такие как: «сбросить эту операцию при отбрасывании» (flush on drop) — что-то, что мы можем кодировать в неасинхронном Rust сегодня. ↩ -
Пример этого из структурированного программирования: Rust — структурированный язык. Ассемблер — не структурированный язык. Вы можете реализовать интерпретатор ассемблера полностью на безопасном Rust — meaning вы можете выразить неструктурированный код на структурированном языке. Я мог бы показать примеры этого, но, эх, я надеюсь, общая линия рассуждений здесь имеет смысл. ↩