Структура Mutex

#![allow(unused)]
fn main() {
pub struct Mutex<T: ?Sized> { /* приватные поля */ }
}

Примитив взаимного исключения, полезный для защиты общих данных.

Этот мьютекс может блокировать поддержку ожидания вызовов. У каждого мьютекса есть тип, представляющий данные, которые он защищает. Для доступа к этим данным необходимо получить охрану мьютекса через вызов lock или try_lock, который возвращает [MutexGuard]. Охрана поддерживает доступ к данным через типаж Deref и позволяет изменять данные через типаж DerefMut.

Отравление

Мьютексы в этом модуле реализуют стратегию под названием "отравление", при которой мьютекс становится отравленным, если он обнаруживает, что поток, удерживающий его, запаниковал.

Как только мьютекс отравлен, все другие потоки по умолчанию не могут получить доступ к данным, так как они, вероятно, испорчены (некоторый инвариант не соблюдается). Для мьютекса это означает, что методы lock и try_lock возвращают Result, который указывает, был ли мьютекс отравлен или нет. В большинстве случаев использования мьютекса эти результаты просто unwrap(), распространяя паники между потоками, чтобы гарантировать, что возможно недействительный инвариант не будет замечен.

Отравление является только рекомендательным: тип PoisonError имеет метод into_inner, который вернет охрану, которая была бы возвращена при успешной блокировке. Это позволяет получить доступ к данным, несмотря на то, что блокировка отравлена.

Кроме того, обнаружение паники не является идеальным, поэтому даже неотравленные мьютексы нужно обрабатывать с осторожностью, поскольку определенные паники могли быть пропущены. Вот неисчерпывающий список ситуаций, когда это может произойти:

  • Если мьютекс блокируется во время паники, например, в реализации Drop или обработчике паники, паника во второй раз при удержании блокировки оставит мьютекс неотравленным. Заметьте, что хотя двойная паника обычно завершает программу, catch_unwind может предотвратить это.
  • Блокировка и разблокировка мьютекса в разных контекстах паники, например, путем сохранения охраны в Cell внутри Drop::drop и доступа к ней снаружи, или наоборот, может повлиять на статус отравления неожиданным образом.
  • Иностранные исключения в настоящее время не вызывают отравление даже при отсутствии других паник.

Хотя это редко происходит в реальном коде, небезопасный код не может полагаться на отравление для корректности, поскольку поведение отравления может зависеть от внешнего контекста. Вот пример неправильного использования отравления:

#![allow(unused)]
fn main() {
use std::sync::Mutex;

struct MutexBox<T> {
    data: Mutex<*mut T>,
}

impl<T> MutexBox<T> {
    pub fn new(value: T) -> Self {
        Self {
            data: Mutex::new(Box::into_raw(Box::new(value))),
        }
    }

    pub fn replace_with(&self, f: impl FnOnce(T) -> T) {
        let ptr = self.data.lock().expect("poisoned");
        // Пока выполняется `f`, данные перемещаются из `*ptr`. Если `f`
        // запаникует, `*ptr` продолжит указывать на удаленное значение. Намерение
        // состоит в том, что это отравит мьютекс, поэтому следующие вызовы
        // `replace_with` запаникуют без чтения `*ptr`. Но поскольку
        // отравление не гарантируется, если это выполняется из обработчика паники,
        // это может привести к использованию после освобождения.
        unsafe {
            (*ptr).write(f((*ptr).read()));
        }
    }
}
}

Примеры

#![allow(unused)]
fn main() {
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::channel;

const N: usize = 10;

// Создаем несколько потоков для увеличения общей переменной (неатомарно), и
// сообщаем основному потоку, когда все увеличения завершены.
//
// Здесь мы используем Arc для совместного использования памяти между потоками, и данные внутри
// Arc защищены мьютексом.
let data = Arc::new(Mutex::new(0));

let (tx, rx) = channel();
for _ in 0..N {
    let (data, tx) = (Arc::clone(&data), tx.clone());
    thread::spawn(move || {
        // К общему состоянию можно получить доступ только после получения блокировки.
        // Наше неатомарное увеличение безопасно, потому что мы единственный поток,
        // который может получить доступ к общему состоянию, когда блокировка удерживается.
        //
        // Мы используем unwrap() для возвращаемого значения, чтобы утверждать, что мы не ожидаем,
        // что потоки когда-либо завершатся неудачей при удержании блокировки.
        let mut data = data.lock().unwrap();
        *data += 1;
        if *data == N {
            tx.send(()).unwrap();
        }
        // блокировка разблокируется здесь, когда `data` выходит из области видимости.
    });
}

rx.recv().unwrap();
}

Чтобы восстановиться после отравленного мьютекса:

#![allow(unused)]
fn main() {
use std::sync::{Arc, Mutex};
use std::thread;

let lock = Arc::new(Mutex::new(0_u32));
let lock2 = Arc::clone(&lock);

let _ = thread::spawn(move || -> () {
    // Этот поток получит мьютекс первым, разворачивая результат
    // `lock`, потому что блокировка не была отравлена.
    let _guard = lock2.lock().unwrap();

    // Эта паника при удержании блокировки (`_guard` в области видимости) отравит
    // мьютекс.
    panic!();
}).join();

// Блокировка отравлена к этому моменту, но возвращаемый результат может быть
// сопоставлен с образцом, чтобы вернуть базовую охрану в обеих ветвях.
let mut guard = match lock.lock() {
    Ok(guard) => guard,
    Err(poisoned) => poisoned.into_inner(),
};

*guard += 1;
}

Чтобы разблокировать охрану мьютекса раньше, чем конец охватывающей области видимости, либо создайте внутреннюю область видимости, либо вручную удалите охрану.

#![allow(unused)]
fn main() {
use std::sync::{Arc, Mutex};
use std::thread;

const N: usize = 3;

let data_mutex = Arc::new(Mutex::new(vec![1, 2, 3, 4]));
let res_mutex = Arc::new(Mutex::new(0));

let mut threads = Vec::with_capacity(N);
(0..N).for_each(|_| {
    let data_mutex_clone = Arc::clone(&data_mutex);
    let res_mutex_clone = Arc::clone(&res_mutex);

    threads.push(thread::spawn(move || {
        // Здесь мы используем блок для ограничения времени жизни охраны блокировки.
        let result = {
            let mut data = data_mutex_clone.lock().unwrap();
            // Это результат некоторой важной и длительной работы.
            let result = data.iter().fold(0, |acc, x| acc + x * 2);
            data.push(result);
            result
            // Охрана мьютекса удаляется здесь вместе с любыми другими значениями,
            // созданными в критической секции.
        };
        // Охрана, созданная здесь, является временной и удаляется в конце оператора, т.е.
        // блокировка не останется удержанной, даже если поток выполнит дополнительную работу.
        *res_mutex_clone.lock().unwrap() += result;
    }));
});

let mut data = data_mutex.lock().unwrap();
// Это результат некоторой важной и длительной работы.
let result = data.iter().fold(0, |acc, x| acc + x * 2);
data.push(result);
// Мы явно удаляем `data`, потому что это больше не нужно и
// поток все еще имеет работу для выполнения. Это позволяет другим потокам начать работу с
// данными немедленно, не дожидаясь завершения остальной несвязанной работы
// здесь.
//
// Это еще более важно здесь, чем в потоках, потому что мы `.join`
// потоки после этого. Если бы мы не удалили охрану мьютекса, поток мог бы
// ждать ее вечно, вызывая взаимную блокировку.
// Как и в потоках, вместо вызова функции `drop` можно было использовать блок.
drop(data);
// Здесь охрана мьютекса не присваивается переменной, и поэтому, даже если
// область видимости не заканчивается после этой строки, мьютекс все равно освобождается: нет
// взаимной блокировки.
*res_mutex.lock().unwrap() += result;

threads.into_iter().for_each(|thread| {
    thread
        .join()
        .expect("The thread creating or execution failed !")
});

assert_eq!(*res_mutex.lock().unwrap(), 800);
}

Реализации

impl<T> Mutex<T>

pub const fn new(t: T) -> Mutex<T>

Создает новый мьютекс в разблокированном состоянии, готовый к использованию.

pub fn get_cloned(&self) -> Result<T, PoisonError<()>> where T: Clone,

🔬 Это экспериментальное API, доступное только в ночных сборках.

Возвращает содержащееся значение путем его клонирования.

Ошибки: Если другой пользователь этого мьютекса запаниковал, удерживая мьютекс, то этот вызов вернет ошибку.

pub fn set(&self, value: T) -> Result<(), PoisonError<T>>

🔬 Это экспериментальное API, доступное только в ночных сборках.

Устанавливает содержащееся значение.

Ошибки: Если другой пользователь этого мьютекса запаниковал, удерживая мьютекс, то этот вызов вернет ошибку, содержащую предоставленное значение.

pub fn replace(&self, value: T) -> LockResult<T>

🔬 Это экспериментальное API, доступное только в ночных сборках.

Заменяет содержащееся значение на value и возвращает старое содержащееся значение.

Ошибки: Если другой пользователь этого мьютекса запаниковал, удерживая мьютекс, то этот вызов вернет ошибку, содержащую предоставленное значение.

impl<T: ?Sized> Mutex<T>

pub fn lock(&self) -> LockResult<MutexGuard<'_, T>>

Получает мьютекс, блокируя текущий поток до тех пор, пока он не сможет это сделать.

pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>>

Пытается получить эту блокировку.

pub fn is_poisoned(&self) -> bool

Определяет, отравлен ли мьютекс в данный момент.

pub fn clear_poison(&self)

Очищает состояние отравления мьютекса.

pub fn into_inner(self) -> LockResult<T> where T: Sized,

Потребляет этот мьютекс, возвращая базовые данные.

pub fn get_mut(&mut self) -> LockResult<&mut T>

Возвращает изменяемую ссылку на базовые данные.

pub fn data_ptr(&self) -> *mut T

🔬 Это экспериментальное API, доступное только в ночных сборках.

Возвращает сырой указатель на базовые данные.