Отмена и безопасность отмены
- Внутренняя vs внешняя отмена
- Потоки vs фьючерсы
drop = cancelтолько в точках await- Полезная функция, но все несколько резкая и неожиданная
- Другие механизмы отмены: abort, cancellation tokens
Безопасность отмены
- Не проблема безопасности памяти или состояния гонки
- Потеря данных или другие логические ошибки
- Разные определения/названия
- определение Tokio
- общее определение/безопасность остановки (halt safety)
- применение идеи реплицированного фьючерса
- Простая потеря данных
- Проблема возобновления
- Проблема с
selectили подобным в циклах - Разделение состояния между фьючерсом и контекстом как коренная причина
Отмена (cancellation) — это фундаментальная концепция в асинхронном программировании на Rust, которая имеет важные последствия для корректности программ.
Отмена и безопасность отмены
Внутренняя vs внешняя отмена
Внутренняя отмена происходит, когда фьючерс решает самостоятельно завершиться:
#![allow(unused)] fn main() { async fn may_cancel_internally() -> Option<String> { if some_condition().await { None // Внутренняя отмена } else { Some("result".to_string()) } } }
Внешняя отмена происходит, когда среда выполнения или другой код решает отменить фьючерс:
#![allow(unused)] fn main() { let handle = tokio::spawn(async_task()); handle.abort(); // Внешняя отмена }
Потоки vs фьючерсы
В отличие от потоков ОС, которые могут быть прерваны в любой точке выполнения, асинхронные задачи в Rust могут быть отменены только в точках await. Это делает отмену более предсказуемой, но все равно требующей осторожности.
drop = cancel только в точках await
Ключевое правило: фьючерс может быть отменен только когда он возвращает Poll::Pending. На практике это означает точки await:
#![allow(unused)] fn main() { async fn vulnerable_function() { step_one(); // Не может быть отменен здесь intermediate_work(); // Не может быть отменен здесь step_two().await; // МОЖЕТ быть отменен здесь ← more_work(); // Не может быть отменен здесь final_step().await; // МОЖЕТ быть отменен здесь ← } }
Полезная функция, но все несколько резкая и неожиданная
Отмена — это мощный инструмент для управления временем жизни задач, но она может быть неожиданной:
#![allow(unused)] fn main() { async fn process_data() -> Result<(), Error> { let data = read_data().await?; let processed = expensive_processing(data); // Вычислительно дорогая операция write_result(processed).await?; // Может быть отменена здесь! Ok(()) } }
Если отмена произойдет после expensive_processing но до write_result, результаты вычислений будут потеряны.
Другие механизмы отмены
abort для задач:
#![allow(unused)] fn main() { let handle = tokio::spawn(async_task()); // ... handle.abort(); // Немедленная отмена задачи }
Cancellation tokens для кооперативной отмены:
#![allow(unused)] fn main() { use tokio_util::sync::CancellationToken; let token = CancellationToken::new(); let cloned_token = token.clone(); tokio::spawn(async move { tokio::select! { _ = cloned_token.cancelled() => { println!("Задача отменена через токен"); } _ = async_work() => { println!("Задача завершена нормально"); } } }); // Позже... token.cancel(); // Сигнал отмены }
Безопасность отмены
Не проблема безопасности памяти или состояния гонки
Безопасность отмены не связана с безопасностью памяти Rust — отмена не может привести к неопределенному поведению. Однако она может привести к логическим ошибкам.
Потеря данных или другие логические ошибки
Пример потери данных:
#![allow(unused)] fn main() { async fn transfer_funds(from: &mut Account, to: &mut Account, amount: u64) -> Result<()> { from.balance -= amount; // Сняли деньги // ← Точка отмены! to.balance += amount; // Не выполнится, если отмена здесь Ok(()) } }
Разные определения/названия
Определение Tokio
Tokio определяет операцию как "безопасную к отмене", если она может быть безопасно отменена в любой точке await.
Общее определение/безопасность остановки (halt safety)
Более общее понятие — "безопасность остановки": код должен оставаться в согласованном состоянии, даже если его выполнение прервано в любой точке.
Применение идеи реплицированного фьючерса
Если фьючерс можно безопасно перезапустить после отмены (идемпотентность), то он безопасен к отмене.
Простая потеря данных
Самый простой вид небезопасности отмены — потеря данных:
#![allow(unused)] fn main() { async fn append_to_file(data: &[u8]) -> Result<()> { let mut file = tokio::fs::OpenOptions::new() .append(true) .open("log.txt") .await?; file.write_all(data).await?; // Может быть отменена после открытия файла file.sync_all().await?; // Но до синхронизации Ok(()) } }
Проблема возобновления
Код, который не может быть корректно возобновлен после отмены:
#![allow(unused)] fn main() { async fn process_stream(mut stream: impl Stream<Item = Data>) -> Result<()> { while let Some(data) = stream.next().await { // Отмена может произойти здесь process(data).await?; // Состояние обработки может быть потеряно } Ok(()) } }
Проблема с select или подобным в циклах
Использование select! в циклах особенно подвержено проблемам отмены:
#![allow(unused)] fn main() { async fn process_with_timeout(mut stream: impl Stream<Item = Data>) { loop { tokio::select! { Some(data) = stream.next() => { // Обрабатываем данные if let Err(_) = process(data).await { break; } } _ = tokio::time::sleep(Duration::from_secs(30)) => { println!("Таймаут"); break; } } } } }
Проблема: Если stream.next() завершится с ошибкой, но select! выберет ветку таймаута, ошибка будет потеряна.
Разделение состояния между фьючерсом и контекстом как коренная причина
Многие проблемы отмены возникают из-за разделения состояния:
#![allow(unused)] fn main() { struct Transaction { data: Vec<u8>, committed: bool, } impl Transaction { async fn commit(mut self) -> Result<()> { write_to_database(&self.data).await?; // ← Точка отмены! self.committed = true; // Не выполнится при отмене Ok(()) } } }
Решение: Атомарные операции или компенсирующие действия:
#![allow(unused)] fn main() { impl Transaction { async fn commit(mut self) -> Result<()> { // Атомарная операция - либо все, либо ничего let result = write_to_database(&self.data).await; match result { Ok(()) => { self.committed = true; Ok(()) } Err(e) => { // Откатываем изменения self.rollback().await; Err(e) } } } async fn rollback(&mut self) { // Компенсирующее действие if !self.committed { // Удаляем частично записанные данные delete_partial_data(&self.data).await; } } } }
Стратегии обеспечения безопасности отмены
1. Идемпотентные операции
Спроектируйте операции так, чтобы их можно было безопасно повторить:
#![allow(unused)] fn main() { async fn idempotent_operation(id: u64, data: &str) -> Result<()> { // Используем "upsert" вместо отдельного insert/update database.upsert(id, data).await } }
2. Компенсирующие действия
Для неидемпотентных операций предусмотрите откат:
#![allow(unused)] fn main() { async fn two_phase_operation() -> Result<()> { let temp_result = phase_one().await?; // Если отмена произойдет здесь, phase_one можно откатить match phase_two(temp_result).await { Ok(final_result) => Ok(final_result), Err(e) => { // Компенсирующее действие rollback_phase_one().await; Err(e) } } } }
3. Проверка состояния при возобновлении
#![allow(unused)] fn main() { async fn resumable_operation(state: &mut OperationState) -> Result<()> { // Проверяем, не была ли операция уже выполнена if state.is_completed { return Ok(()); } // Выполняем операцию perform_work().await?; state.is_completed = true; Ok(()) } }
4. Использование токенов отмены для кооперативного завершения
#![allow(unused)] fn main() { async fn cancellation_safe_operation( token: CancellationToken ) -> Result<()> { let checkpoint = || { if token.is_cancelled() { Err(Error::Cancelled) } else { Ok(()) } }; checkpoint()?; step_one().await; checkpoint()?; step_two().await; checkpoint()?; final_step().await; Ok(()) } }