Корректное завершение работы

Цель этой страницы - дать обзор того, как правильно реализовать завершение работы в асинхронных приложениях.

Обычно есть три части реализации корректного завершения работы:

  • Определение момента для завершения работы.
  • Уведомление каждой части программы о необходимости завершения работы.
  • Ожидание завершения работы других частей программы.

Остальная часть статьи пройдет через эти части. Реальная реализация подхода, описанного здесь, может быть найдена в mini-redis, в частности в файлах src/server.rs и src/shutdown.rs.

Определение момента для завершения работы

Это, конечно, будет зависеть от приложения, но одним очень распространенным критерием завершения работы является получение приложением сигнала от операционной системы. Это происходит, например, когда вы нажимаете ctrl+c в терминале во время работы программы. Чтобы обнаружить это, Tokio предоставляет функцию tokio::signal::ctrl_c, которая будет ждать до получения такого сигнала. Вы можете использовать ее так:

use tokio::signal;

#[tokio::main]
async fn main() {
    // ... создаем приложение как отдельную задачу ...

    match signal::ctrl_c().await {
        Ok(()) => {},
        Err(err) => {
            eprintln!("Unable to listen for shutdown signal: {}", err);
            // мы также завершаем работу в случае ошибки
        },
    }

    // отправляем сигнал завершения работы приложению и ждем
}

Если у вас есть несколько условий завершения работы, вы можете использовать mpsc канал для отправки сигнала завершения работы в одно место. Затем вы можете использовать select на ctrl_c и канале. Например:

use tokio::signal;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();

    // ... создаем приложение как отдельную задачу ...
    //
    // приложение использует shutdown_send в случае, если завершение работы было инициировано изнутри
    // приложения

    tokio::select! {
        _ = signal::ctrl_c() => {},
        _ = shutdown_recv.recv() => {},
    }

    // отправляем сигнал завершения работы приложению и ждем
}

Уведомление о необходимости завершения работы

Когда вы хотите сообщить одной или нескольким задачам о завершении работы, вы можете использовать [Токены отмены][cancellation-tokens]. Эти токены позволяют уведомлять задачи, что они должны завершиться в ответ на запрос отмены, что упрощает реализацию корректного завершения работы.

Чтобы разделить CancellationToken между несколькими задачами, вы должны клонировать его. Это связано с правилом единоличного владения, которое требует, чтобы каждое значение имело единственного владельца. При клонировании токена вы получаете другой токен, неотличимый от оригинала; если один отменен, то другой также отменен. Вы можете сделать столько клонов, сколько нужно, и когда вы вызываете cancel на одном из них, все они отменяются.

Вот шаги для использования CancellationToken в нескольких задачах:

  1. Сначала создайте новый CancellationToken.
  2. Затем создайте клон оригинального CancellationToken, вызвав метод clone на оригинальном токене. Это создаст новый токен, который может использоваться другой задачей.
  3. Передайте оригинальный или клонированный токен задачам, которые должны реагировать на запросы отмены.
  4. Когда вы хотите корректно завершить задачи, вызовите метод cancel на оригинальном или клонированном токене. Любая задача, слушающая запрос отмены на оригинальном или клонированном токене, будет уведомлена о завершении работы.

Вот фрагмент кода, демонстрирующий вышеупомянутые шаги:

// Шаг 1: Создаем новый CancellationToken
let token = CancellationToken::new();

// Шаг 2: Клонируем токен для использования в другой задаче
let cloned_token = token.clone();

// Задача 1 - Ждем отмены токена или долгого времени
let task1_handle = tokio::spawn(async move {
    tokio::select! {
        // Шаг 3: Используем клонированный токен для прослушивания запросов отмены
        _ = cloned_token.cancelled() => {
            // Токен был отменен, задача может завершиться
        }
        _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
            // Долгая работа завершена
        }
    }
});

// Задача 2 - Отменяем оригинальный токен после небольшой задержки
tokio::spawn(async move {
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;

    // Шаг 4: Отменяем оригинальный или клонированный токен, чтобы уведомить другие задачи о корректном завершении работы
    token.cancel();
});

// Ждем завершения задач
task1_handle.await.unwrap()

С Токенами отмены вам не нужно немедленно завершать задачу при отмене токена. Вместо этого вы можете выполнить процедуру завершения работы перед завершением задачи, такую как сброс данных в файл или базу данных, или отправка сообщения о завершении работы по соединению.

Ожидание завершения работы

После того как вы сообщили другим задачам о завершении работы, вам нужно будет дождаться их завершения. Один простой способ сделать это - использовать трекер задач. Трекер задач - это коллекция задач. Метод wait трекера задач дает вам future, который разрешается только после того, как все содержащиеся в нем future разрешатся и трекер задач будет закрыт.

Следующий пример создаст 10 задач, затем использует трекер задач для ожидания их завершения.

use std::time::Duration;
use tokio::time::sleep;
use tokio_util::task::TaskTracker;

#[tokio::main]
async fn main() {
    let tracker = TaskTracker::new();

    for i in 0..10 {
        tracker.spawn(some_operation(i));
    }

    // Как только мы создали все, мы закрываем трекер.
    tracker.close();

    // Ждем завершения всего.
    tracker.wait().await;

    println!("Это печатается после всех задач.");
}

async fn some_operation(i: u64) {
    sleep(Duration::from_millis(100 * i)).await;
    println!("Задача {} завершается.", i);
}