Отмена и безопасность отмены

  • Внутренняя vs внешняя отмена
  • Потоки vs фьючерсы
  • drop = cancel только в точках await
  • Полезная функция, но все несколько резкая и неожиданная
  • Другие механизмы отмены: abort, cancellation tokens

Безопасность отмены

  • Не проблема безопасности памяти или состояния гонки
  • Потеря данных или другие логические ошибки
  • Разные определения/названия
    • определение Tokio
    • общее определение/безопасность остановки (halt safety)
    • применение идеи реплицированного фьючерса
  • Простая потеря данных
  • Проблема возобновления
  • Проблема с select или подобным в циклах
  • Разделение состояния между фьючерсом и контекстом как коренная причина

Отмена (cancellation) — это фундаментальная концепция в асинхронном программировании на Rust, которая имеет важные последствия для корректности программ.

Отмена и безопасность отмены

Внутренняя vs внешняя отмена

Внутренняя отмена происходит, когда фьючерс решает самостоятельно завершиться:

#![allow(unused)]
fn main() {
async fn may_cancel_internally() -> Option<String> {
    if some_condition().await {
        None // Внутренняя отмена
    } else {
        Some("result".to_string())
    }
}
}

Внешняя отмена происходит, когда среда выполнения или другой код решает отменить фьючерс:

#![allow(unused)]
fn main() {
let handle = tokio::spawn(async_task());
handle.abort(); // Внешняя отмена
}

Потоки vs фьючерсы

В отличие от потоков ОС, которые могут быть прерваны в любой точке выполнения, асинхронные задачи в Rust могут быть отменены только в точках await. Это делает отмену более предсказуемой, но все равно требующей осторожности.

drop = cancel только в точках await

Ключевое правило: фьючерс может быть отменен только когда он возвращает Poll::Pending. На практике это означает точки await:

#![allow(unused)]
fn main() {
async fn vulnerable_function() {
    step_one();           // Не может быть отменен здесь
    intermediate_work();  // Не может быть отменен здесь
    step_two().await;     // МОЖЕТ быть отменен здесь ←
    more_work();          // Не может быть отменен здесь
    final_step().await;   // МОЖЕТ быть отменен здесь ←
}
}

Полезная функция, но все несколько резкая и неожиданная

Отмена — это мощный инструмент для управления временем жизни задач, но она может быть неожиданной:

#![allow(unused)]
fn main() {
async fn process_data() -> Result<(), Error> {
    let data = read_data().await?;
    let processed = expensive_processing(data); // Вычислительно дорогая операция
    write_result(processed).await?; // Может быть отменена здесь!
    Ok(())
}
}

Если отмена произойдет после expensive_processing но до write_result, результаты вычислений будут потеряны.

Другие механизмы отмены

abort для задач:

#![allow(unused)]
fn main() {
let handle = tokio::spawn(async_task());
// ...
handle.abort(); // Немедленная отмена задачи
}

Cancellation tokens для кооперативной отмены:

#![allow(unused)]
fn main() {
use tokio_util::sync::CancellationToken;

let token = CancellationToken::new();
let cloned_token = token.clone();

tokio::spawn(async move {
    tokio::select! {
        _ = cloned_token.cancelled() => {
            println!("Задача отменена через токен");
        }
        _ = async_work() => {
            println!("Задача завершена нормально");
        }
    }
});

// Позже...
token.cancel(); // Сигнал отмены
}

Безопасность отмены

Не проблема безопасности памяти или состояния гонки

Безопасность отмены не связана с безопасностью памяти Rust — отмена не может привести к неопределенному поведению. Однако она может привести к логическим ошибкам.

Потеря данных или другие логические ошибки

Пример потери данных:

#![allow(unused)]
fn main() {
async fn transfer_funds(from: &mut Account, to: &mut Account, amount: u64) -> Result<()> {
    from.balance -= amount;    // Сняли деньги
    // ← Точка отмены!
    to.balance += amount;      // Не выполнится, если отмена здесь
    Ok(())
}
}

Разные определения/названия

Определение Tokio

Tokio определяет операцию как "безопасную к отмене", если она может быть безопасно отменена в любой точке await.

Общее определение/безопасность остановки (halt safety)

Более общее понятие — "безопасность остановки": код должен оставаться в согласованном состоянии, даже если его выполнение прервано в любой точке.

Применение идеи реплицированного фьючерса

Если фьючерс можно безопасно перезапустить после отмены (идемпотентность), то он безопасен к отмене.

Простая потеря данных

Самый простой вид небезопасности отмены — потеря данных:

#![allow(unused)]
fn main() {
async fn append_to_file(data: &[u8]) -> Result<()> {
    let mut file = tokio::fs::OpenOptions::new()
        .append(true)
        .open("log.txt")
        .await?;
    
    file.write_all(data).await?; // Может быть отменена после открытия файла
    file.sync_all().await?;      // Но до синхронизации
    
    Ok(())
}
}

Проблема возобновления

Код, который не может быть корректно возобновлен после отмены:

#![allow(unused)]
fn main() {
async fn process_stream(mut stream: impl Stream<Item = Data>) -> Result<()> {
    while let Some(data) = stream.next().await { // Отмена может произойти здесь
        process(data).await?; // Состояние обработки может быть потеряно
    }
    Ok(())
}
}

Проблема с select или подобным в циклах

Использование select! в циклах особенно подвержено проблемам отмены:

#![allow(unused)]
fn main() {
async fn process_with_timeout(mut stream: impl Stream<Item = Data>) {
    loop {
        tokio::select! {
            Some(data) = stream.next() => {
                // Обрабатываем данные
                if let Err(_) = process(data).await {
                    break;
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(30)) => {
                println!("Таймаут");
                break;
            }
        }
    }
}
}

Проблема: Если stream.next() завершится с ошибкой, но select! выберет ветку таймаута, ошибка будет потеряна.

Разделение состояния между фьючерсом и контекстом как коренная причина

Многие проблемы отмены возникают из-за разделения состояния:

#![allow(unused)]
fn main() {
struct Transaction {
    data: Vec<u8>,
    committed: bool,
}

impl Transaction {
    async fn commit(mut self) -> Result<()> {
        write_to_database(&self.data).await?;
        // ← Точка отмены!
        self.committed = true; // Не выполнится при отмене
        Ok(())
    }
}
}

Решение: Атомарные операции или компенсирующие действия:

#![allow(unused)]
fn main() {
impl Transaction {
    async fn commit(mut self) -> Result<()> {
        // Атомарная операция - либо все, либо ничего
        let result = write_to_database(&self.data).await;
        match result {
            Ok(()) => {
                self.committed = true;
                Ok(())
            }
            Err(e) => {
                // Откатываем изменения
                self.rollback().await;
                Err(e)
            }
        }
    }
    
    async fn rollback(&mut self) {
        // Компенсирующее действие
        if !self.committed {
            // Удаляем частично записанные данные
            delete_partial_data(&self.data).await;
        }
    }
}
}

Стратегии обеспечения безопасности отмены

1. Идемпотентные операции

Спроектируйте операции так, чтобы их можно было безопасно повторить:

#![allow(unused)]
fn main() {
async fn idempotent_operation(id: u64, data: &str) -> Result<()> {
    // Используем "upsert" вместо отдельного insert/update
    database.upsert(id, data).await
}
}

2. Компенсирующие действия

Для неидемпотентных операций предусмотрите откат:

#![allow(unused)]
fn main() {
async fn two_phase_operation() -> Result<()> {
    let temp_result = phase_one().await?;
    
    // Если отмена произойдет здесь, phase_one можно откатить
    match phase_two(temp_result).await {
        Ok(final_result) => Ok(final_result),
        Err(e) => {
            // Компенсирующее действие
            rollback_phase_one().await;
            Err(e)
        }
    }
}
}

3. Проверка состояния при возобновлении

#![allow(unused)]
fn main() {
async fn resumable_operation(state: &mut OperationState) -> Result<()> {
    // Проверяем, не была ли операция уже выполнена
    if state.is_completed {
        return Ok(());
    }
    
    // Выполняем операцию
    perform_work().await?;
    state.is_completed = true;
    
    Ok(())
}
}

4. Использование токенов отмены для кооперативного завершения

#![allow(unused)]
fn main() {
async fn cancellation_safe_operation(
    token: CancellationToken
) -> Result<()> {
    let checkpoint = || {
        if token.is_cancelled() {
            Err(Error::Cancelled)
        } else {
            Ok(())
        }
    };
    
    checkpoint()?;
    step_one().await;
    
    checkpoint()?;
    step_two().await;
    
    checkpoint()?;
    final_step().await;
    
    Ok(())
}
}