Совместимость с синхронным кодом

В большинстве примеров использования Tokio мы помечаем главную функцию #[tokio::main] и делаем весь проект асинхронным.

В некоторых случаях может потребоваться запустить небольшую часть синхронного кода. Для получения дополнительной информации об этом см. spawn_blocking.

В других случаях может быть проще структурировать приложение как в основном синхронное, с меньшими или логически отдельными асинхронными частями. Например, приложение с графическим интерфейсом может захотеть запустить код GUI в основном потоке и запустить среду выполнения Tokio рядом с ним в другом потоке.

На этой странице объясняется, как можно изолировать async/await до небольшой части вашего проекта.

Во что раскрывается #[tokio::main]

Макрос #[tokio::main] - это макрос, который заменяет вашу главную функцию на не-асинхронную главную функцию, которая запускает среду выполнения и затем вызывает ваш код. Например, это:

#[tokio::main]
async fn main() {
    println!("Hello world");
}

превращается в это:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

макросом. Чтобы использовать async/await в наших собственных проектах, мы можем сделать что-то подобное, где мы используем метод block_on для входа в асинхронный контекст, где это уместно.

Синхронный интерфейс для mini-redis

В этом разделе мы рассмотрим, как построить синхронный интерфейс для mini-redis, сохраняя объект Runtime и используя его метод block_on. В следующих разделах мы обсудим некоторые альтернативные подходы и когда следует использовать каждый подход.

Интерфейс, который мы будем оборачивать, - это асинхронный тип Client. У него есть несколько методов, и мы реализуем блокирующую версию следующих методов:

Для этого мы вводим новый файл с именем src/clients/blocking_client.rs и инициализируем его структурой-оберткой вокруг асинхронного типа Client:

use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;

pub use crate::clients::client::Message;

/// Установленное соединение с сервером Redis.
pub struct BlockingClient {
    /// Асинхронный `Client`.
    inner: crate::clients::Client,

    /// Среда выполнения `current_thread` для выполнения операций над
    /// асинхронным клиентом в блокирующем режиме.
    rt: Runtime,
}

impl BlockingClient {
    pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;

        // Вызываем асинхронный метод connect с использованием среды выполнения.
        let inner = rt.block_on(crate::clients::Client::connect(addr))?;

        Ok(BlockingClient { inner, rt })
    }
}

Здесь мы включили функцию-конструктор как наш первый пример того, как выполнять асинхронные методы в не-асинхронном контексте. Мы делаем это с помощью метода block_on для типа Tokio Runtime, который выполняет асинхронный метод и возвращает его результат.

Важная деталь - использование среды выполнения current_thread. Обычно при использовании Tokio вы бы использовали среду выполнения по умолчанию multi_thread, которая создает кучу фоновых потоков, чтобы она могла эффективно запускать много вещей одновременно. Для нашего случая использования мы будем делать только одну вещь за раз, поэтому мы ничего не выиграем от запуска нескольких потоков. Это делает среду выполнения current_thread идеальным вариантом, поскольку она не создает никаких потоков.

Вызов enable_all включает драйверы IO и таймеров в среде выполнения Tokio. Если они не включены, среда выполнения не может выполнять IO или использовать таймеры.

предупреждение Поскольку среда выполнения current_thread не создает потоки, она работает только тогда, когда вызывается block_on. Как только block_on возвращает результат, все созданные задачи в этой среде выполнения заморозятся, пока вы снова не вызовете block_on. Используйте среду выполнения multi_threaded, если созданные задачи должны продолжать работать, когда block_on не вызывается.

Как только у нас есть эта структура, большинство методов легко реализовать:

use bytes::Bytes;
use std::time::Duration;

impl BlockingClient {
    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
        self.rt.block_on(self.inner.get(key))
    }

    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
        self.rt.block_on(self.inner.set(key, value))
    }

    pub fn set_expires(
        &mut self,
        key: &str,
        value: Bytes,
        expiration: Duration,
    ) -> crate::Result<()> {
        self.rt.block_on(self.inner.set_expires(key, value, expiration))
    }

    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
        self.rt.block_on(self.inner.publish(channel, message))
    }
}

Метод Client::subscribe более интересен, потому что он преобразует Client в объект Subscriber. Мы можем реализовать его следующим образом:

/// Клиент, который вошел в режим pub/sub.
///
/// Как только клиенты подписываются на канал, они могут выполнять только
/// команды, связанные с pub/sub. Тип `BlockingClient` преобразуется
/// в тип `BlockingSubscriber`, чтобы предотвратить вызов
/// методов, не связанных с pub/sub.
pub struct BlockingSubscriber {
    /// Асинхронный `Subscriber`.
    inner: crate::clients::Subscriber,

    /// Среда выполнения `current_thread` для выполнения операций над
    /// асинхронным клиентом в блокирующем режиме.
    rt: Runtime,
}

impl BlockingClient {
    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
        Ok(BlockingSubscriber {
            inner: subscriber,
            rt: self.rt,
        })
    }
}

impl BlockingSubscriber {
    pub fn get_subscribed(&self) -> &[String] {
        self.inner.get_subscribed()
    }

    pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
        self.rt.block_on(self.inner.next_message())
    }

    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.subscribe(channels))
    }

    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.unsubscribe(channels))
    }
}

Итак, метод subscribe сначала использует среду выполнения для преобразования асинхронного Client в асинхронный Subscriber. Затем он сохранит полученный Subscriber вместе со средой выполнения Runtime и реализует различные методы с помощью block_on.

Обратите внимание, что асинхронная структура Subscriber имеет не-асинхронный метод get_subscribed. Чтобы обработать это, мы просто вызываем его напрямую без участия среды выполнения.

Другие подходы

Вышеуказанный раздел объясняет самый простой способ реализации синхронной обертки, но это не единственный способ. Подходы таковы:

  • Создать Runtime и вызвать block_on на асинхронном коде.
  • Создать Runtime и spawn задачи на нем.
  • Запустить Runtime в отдельном потоке и отправлять ему сообщения.

Мы уже видели первый подход. Два других подхода описаны ниже.

Создание задач на среде выполнения

Объект Runtime имеет метод spawn. Когда вы вызываете этот метод, вы создаете новую фоновую задачу для выполнения в среде выполнения. Например:

use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Делаем что-то времязатратное, пока выполняются фоновые задачи.
    std::thread::sleep(Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Ждем завершения всех задач.
    for handle in handles {
        // Метод `spawn` возвращает `JoinHandle`. `JoinHandle` - это
        // future, поэтому мы можем ждать его с помощью `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

async fn my_bg_task(i: u64) {
    // Вычитая, задачи с большими значениями i спят в течение
    // более короткого времени.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    sleep(Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.

В приведенном выше примере мы создаем 10 фоновых задач в среде выполнения, затем ждем их все. В качестве примера, это может быть хорошим способом реализации фоновых сетевых запросов в графическом приложении, потому что сетевые запросы слишком времязатратны, чтобы запускать их в основном потоке GUI. Вместо этого вы создаете запрос в среде выполнения Tokio, работающей в фоне, и заставляете задачу отправлять информацию обратно в код GUI, когда запрос завершен, или даже постепенно, если вы хотите индикатор выполнения.

В этом примере важно, что среда выполнения настроена как среда выполнения multi_thread. Если вы измените ее на среду выполнения current_thread, вы обнаружите, что времязатратная задача завершается до того, как любая из фоновых задач начнется. Это потому, что фоновые задачи, созданные в среде выполнения current_thread, будут выполняться только во время вызовов block_on, так как в противном случае среда выполнения не имеет места для их запуска.

Пример ждет завершения созданных задач, вызывая block_on на JoinHandle, возвращенном вызовом spawn, но это не единственный способ сделать это. Вот некоторые альтернативы:

  • Использовать канал передачи сообщений, такой как tokio::sync::mpsc.
  • Изменять общее значение, защищенное, например, Mutex. Это может быть хорошим подходом для индикатора выполнения в GUI, где GUI читает общее значение каждый кадр.

Метод spawn также доступен для типа Handle. Тип Handle можно клонировать, чтобы получить много handles к среде выполнения, и каждый Handle можно использовать для создания новых задач в среде выполнения.

Отправка сообщений

Третий метод - создать среду выполнения и использовать передачу сообщений для общения с ней. Это включает немного больше шаблонного кода, чем два других подхода, но это самый гибкий подход. Вы можете найти базовый пример ниже:

#![allow(unused)]
fn main() {
use tokio::runtime::Builder;
use tokio::sync::mpsc;

pub struct Task {
    name: String,
    // информация, описывающая задачу
}

async fn handle_task(task: Task) {
    println!("Got task {}", task.name);
}

#[derive(Clone)]
pub struct TaskSpawner {
    spawn: mpsc::Sender<Task>,
}

impl TaskSpawner {
    pub fn new() -> TaskSpawner {
        // Настраиваем канал для общения.
        let (send, mut recv) = mpsc::channel(16);

        // Строим среду выполнения для нового потока.
        //
        // Среда выполнения создается перед созданием потока
        // для более чистого перенаправления ошибок, если `unwrap()`
        // паникует.
        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            rt.block_on(async move {
                while let Some(task) = recv.recv().await {
                    tokio::spawn(handle_task(task));
                }

                // Как только все отправители выйдут из области видимости,
                // вызов `.recv()` вернет None, и он выйдет из
                // цикла while и завершит поток.
            });
        });

        TaskSpawner {
            spawn: send,
        }
    }

    pub fn spawn_task(&self, task: Task) {
        match self.spawn.blocking_send(task) {
            Ok(()) => {},
            Err(_) => panic!("The shared runtime has shut down."),
        }
    }
}
}

Этот пример можно настроить многими способами. Например, вы можете использовать Semaphore для ограничения количества активных задач, или вы можете использовать канал в обратном направлении для отправки ответа создателю. Когда вы создаете среду выполнения таким образом, это тип [актора].