blob: 26735d1be4af2c3b5f2af4bb69214f9b025bc74b [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::cell::UnsafeCell;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{spin_loop_hint, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::yield_now;
use crate::sync::waiter::{Kind as WaiterKind, Waiter, WaiterAdapter, WaiterList, WaitingFor};
// Set when the mutex is exclusively locked.
const LOCKED: usize = 1 << 0;
// Set when there are one or more threads waiting to acquire the lock.
const HAS_WAITERS: usize = 1 << 1;
// Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
// the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
// thread has been removed from the wait queue but has not gotten CPU time yet.
const DESIGNATED_WAKER: usize = 1 << 2;
// Used to provide exclusive access to the `waiters` field in `Mutex`. Should only be held while
// modifying the waiter list.
const SPINLOCK: usize = 1 << 3;
// Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
// attempting to acquire a shared lock will be preventing from getting it when this bit is set.
// However, this bit is ignored once a thread has gone through the wait queue at least once.
const WRITER_WAITING: usize = 1 << 4;
// Set when a thread has gone through the wait queue many times but has failed to acquire the lock
// every time it is woken up. When this bit is set, all other threads are prevented from acquiring
// the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
const LONG_WAIT: usize = 1 << 5;
// The bit that is added to the mutex state in order to acquire a shared lock. Since more than one
// thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
// bits in the state to track the number of threads that have acquired a shared lock.
const READ_LOCK: usize = 1 << 8;
// Mask used for checking if any threads currently hold a shared lock.
const READ_MASK: usize = !0xff;
// Mask used to check if the lock is held in either shared or exclusive mode.
const ANY_LOCK: usize = LOCKED | READ_MASK;
// The number of times the thread should just spin and attempt to re-acquire the lock.
const SPIN_THRESHOLD: usize = 7;
// The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
// bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
// high so that we don't lose the benefit of having running threads unless it is absolutely
// necessary.
const LONG_WAIT_THRESHOLD: usize = 19;
// Common methods between shared and exclusive locks.
trait Kind {
// The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
// are not zero then the thread will first spin and retry a few times before adding itself to
// the wait queue.
fn zero_to_acquire() -> usize;
// The bit that must be added in order to acquire this kind of lock. This should either be
// `LOCKED` or `READ_LOCK`.
fn add_to_acquire() -> usize;
// The bits that should be set when a thread adds itself to the wait queue while waiting to
// acquire this kind of lock.
fn set_when_waiting() -> usize;
// The bits that should be cleared when a thread acquires this kind of lock.
fn clear_on_acquire() -> usize;
// The waiter that a thread should use when waiting to acquire this kind of lock.
fn new_waiter(raw: &RawMutex) -> Arc<Waiter>;
}
// A lock type for shared read-only access to the data. More than one thread may hold this kind of
// lock simultaneously.
struct Shared;
impl Kind for Shared {
fn zero_to_acquire() -> usize {
LOCKED | WRITER_WAITING | LONG_WAIT
}
fn add_to_acquire() -> usize {
READ_LOCK
}
fn set_when_waiting() -> usize {
0
}
fn clear_on_acquire() -> usize {
0
}
fn new_waiter(raw: &RawMutex) -> Arc<Waiter> {
Arc::new(Waiter::new(
WaiterKind::Shared,
cancel_waiter,
raw as *const RawMutex as usize,
WaitingFor::Mutex,
))
}
}
// A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
// kind of lock at a time.
struct Exclusive;
impl Kind for Exclusive {
fn zero_to_acquire() -> usize {
LOCKED | READ_MASK | LONG_WAIT
}
fn add_to_acquire() -> usize {
LOCKED
}
fn set_when_waiting() -> usize {
WRITER_WAITING
}
fn clear_on_acquire() -> usize {
WRITER_WAITING
}
fn new_waiter(raw: &RawMutex) -> Arc<Waiter> {
Arc::new(Waiter::new(
WaiterKind::Exclusive,
cancel_waiter,
raw as *const RawMutex as usize,
WaitingFor::Mutex,
))
}
}
// Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
// set in the mutex state when the current thread releases the spin lock protecting the waiter list.
//
// If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
// waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
// found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
// bit, which should be set when the thread releases the spin lock.
//
// If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
// no bits are set in the returned `usize`.
fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
let mut to_wake = WaiterList::new(WaiterAdapter::new());
let mut set_on_release = 0;
let mut cursor = waiters.front_mut();
let mut waking_readers = false;
while let Some(w) = cursor.get() {
match w.kind() {
WaiterKind::Exclusive if !waking_readers => {
// This is the first waiter and it's a writer. No need to check the other waiters.
let waiter = cursor.remove().unwrap();
waiter.set_waiting_for(WaitingFor::None);
to_wake.push_back(waiter);
break;
}
WaiterKind::Shared => {
// This is a reader and the first waiter in the list was not a writer so wake up all
// the readers in the wait list.
let waiter = cursor.remove().unwrap();
waiter.set_waiting_for(WaitingFor::None);
to_wake.push_back(waiter);
waking_readers = true;
}
WaiterKind::Exclusive => {
// We found a writer while looking for more readers to wake up. Set the
// `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
// readers currently in the wait list will ignore this bit since they already waited
// once.
set_on_release |= WRITER_WAITING;
cursor.move_next();
}
}
}
(to_wake, set_on_release)
}
#[inline]
fn cpu_relax(iterations: usize) {
for _ in 0..iterations {
spin_loop_hint();
}
}
pub(crate) struct RawMutex {
state: AtomicUsize,
waiters: UnsafeCell<WaiterList>,
}
impl RawMutex {
pub fn new() -> RawMutex {
RawMutex {
state: AtomicUsize::new(0),
waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
}
}
#[inline]
pub async fn lock(&self) {
match self
.state
.compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => {}
Err(oldstate) => {
// If any bits that should be zero are not zero or if we fail to acquire the lock
// with a single compare_exchange then go through the slow path.
if (oldstate & Exclusive::zero_to_acquire()) != 0
|| self
.state
.compare_exchange_weak(
oldstate,
(oldstate + Exclusive::add_to_acquire())
& !Exclusive::clear_on_acquire(),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
self.lock_slow::<Exclusive>(0, 0).await;
}
}
}
}
#[inline]
pub async fn read_lock(&self) {
match self
.state
.compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => {}
Err(oldstate) => {
if (oldstate & Shared::zero_to_acquire()) != 0
|| self
.state
.compare_exchange_weak(
oldstate,
(oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
self.lock_slow::<Shared>(0, 0).await;
}
}
}
}
// Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
// when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
// `K::zero_to_acquire()`.
#[cold]
async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
let mut spin_count = 0;
let mut wait_count = 0;
let mut waiter = None;
loop {
let oldstate = self.state.load(Ordering::Relaxed);
// If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
// directly.
if (oldstate & zero_to_acquire) == 0 {
if self
.state
.compare_exchange_weak(
oldstate,
(oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
return;
}
} else if (oldstate & SPINLOCK) == 0 {
// The mutex is locked and the spin lock is available. Try to add this thread
// to the waiter queue.
let w = waiter.get_or_insert_with(|| K::new_waiter(self));
w.reset(WaitingFor::Mutex);
if self
.state
.compare_exchange_weak(
oldstate,
(oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let mut set_on_release = 0;
// Safe because we have acquired the spin lock and it provides exclusive
// access to the waiter queue.
if wait_count < LONG_WAIT_THRESHOLD {
// Add the waiter to the back of the queue.
unsafe { (*self.waiters.get()).push_back(w.clone()) };
} else {
// This waiter has gone through the queue too many times. Put it in the
// front of the queue and block all other threads from acquiring the lock
// until this one has acquired it at least once.
unsafe { (*self.waiters.get()).push_front(w.clone()) };
// Set the LONG_WAIT bit to prevent all other threads from acquiring the
// lock.
set_on_release |= LONG_WAIT;
// Make sure we clear the LONG_WAIT bit when we do finally get the lock.
clear |= LONG_WAIT;
// Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
// from acquiring the lock.
zero_to_acquire &= !LONG_WAIT;
}
// Release the spin lock.
let mut state = oldstate;
loop {
match self.state.compare_exchange_weak(
state,
(state | set_on_release) & !SPINLOCK,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(w) => state = w,
}
}
// Now wait until we are woken.
w.wait().await;
// The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
// thread that originally held the lock. While this bit is set, no other waiters
// will be woken up so it's important to clear it the next time we try to
// acquire the main lock or the spin lock.
clear |= DESIGNATED_WAKER;
// Now that the thread has waited once, we no longer care if there is a writer
// waiting. Only the limits of mutual exclusion can prevent us from acquiring
// the lock.
zero_to_acquire &= !WRITER_WAITING;
// Reset the spin count since we just went through the wait queue.
spin_count = 0;
// Increment the wait count since we went through the wait queue.
wait_count += 1;
// Skip the `cpu_relax` below.
continue;
}
}
// Both the lock and the spin lock are held by one or more other threads. First, we'll
// spin a few times in case we can acquire the lock or the spin lock. If that fails then
// we yield because we might be preventing the threads that do hold the 2 locks from
// getting cpu time.
if spin_count < SPIN_THRESHOLD {
cpu_relax(1 << spin_count);
spin_count += 1;
} else {
yield_now();
}
}
}
#[inline]
pub fn unlock(&self) {
// Fast path, if possible. We can directly clear the locked bit since we have exclusive
// access to the mutex.
let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
// Panic if we just tried to unlock a mutex that wasn't held by this thread. This shouldn't
// really be possible since `unlock` is not a public method.
debug_assert_eq!(
oldstate & READ_MASK,
0,
"`unlock` called on mutex held in read-mode"
);
debug_assert_ne!(
oldstate & LOCKED,
0,
"`unlock` called on mutex not held in write-mode"
);
if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
// The oldstate has waiters but no designated waker has been chosen yet.
self.unlock_slow();
}
}
#[inline]
pub fn read_unlock(&self) {
// Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
// previously added it.
let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
debug_assert_eq!(
oldstate & LOCKED,
0,
"`read_unlock` called on mutex held in write-mode"
);
debug_assert_ne!(
oldstate & READ_MASK,
0,
"`read_unlock` called on mutex not held in read-mode"
);
if (oldstate & HAS_WAITERS) != 0
&& (oldstate & DESIGNATED_WAKER) == 0
&& (oldstate & READ_MASK) == READ_LOCK
{
// There are waiters, no designated waker has been chosen yet, and the last reader is
// unlocking so we have to take the slow path.
self.unlock_slow();
}
}
#[cold]
fn unlock_slow(&self) {
let mut spin_count = 0;
loop {
let oldstate = self.state.load(Ordering::Relaxed);
if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
// No more waiters or a designated waker has been chosen. Nothing left for us to do.
return;
} else if (oldstate & SPINLOCK) == 0 {
// The spin lock is not held by another thread. Try to acquire it. Also set the
// `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
if self
.state
.compare_exchange_weak(
oldstate,
oldstate | SPINLOCK | DESIGNATED_WAKER,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
// Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
// clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
// to be cleared.
let mut clear = SPINLOCK;
// Safe because the spinlock guarantees exclusive access to the waiter list and
// the reference does not escape this function.
let waiters = unsafe { &mut *self.waiters.get() };
let (wake_list, set_on_release) = get_wake_list(waiters);
// If the waiter list is now empty, clear the HAS_WAITERS bit.
if waiters.is_empty() {
clear |= HAS_WAITERS;
}
if wake_list.is_empty() {
// Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
// that we set when we acquired the spin lock.
clear |= DESIGNATED_WAKER;
}
// Release the spin lock and clear any other bits as necessary. Also, set any
// bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
// bit, which needs to be set when we are waking up a bunch of readers and there
// are still writers in the wait queue. This will prevent any readers that
// aren't in `wake_list` from acquiring the read lock.
let mut state = oldstate;
loop {
match self.state.compare_exchange_weak(
state,
(state | set_on_release) & !clear,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(w) => state = w,
}
}
// Now wake the waiters, if any.
for w in wake_list {
w.wake();
}
// We're done.
return;
}
}
// Spin and try again. It's ok to block here as we have already released the lock.
if spin_count < SPIN_THRESHOLD {
cpu_relax(1 << spin_count);
spin_count += 1;
} else {
yield_now();
}
}
}
// Transfer waiters from the `Condvar` wait list to the `Mutex` wait list. `all_readers` may
// be set to true if all waiters are waiting to acquire a shared lock but should not be true if
// there is even one waiter waiting on an exclusive lock.
//
// This is similar to what the `FUTEX_CMP_REQUEUE` flag does on linux.
pub fn transfer_waiters(&self, new_waiters: &mut WaiterList, all_readers: bool) {
if new_waiters.is_empty() {
return;
}
let mut oldstate = self.state.load(Ordering::Relaxed);
let can_acquire_read_lock = (oldstate & Shared::zero_to_acquire()) == 0;
// The lock needs to be held in some mode or else the waiters we transfer now may never get
// woken up. Additionally, if all the new waiters are readers and can acquire the lock now
// then we can just wake them up.
if (oldstate & ANY_LOCK) == 0 || (all_readers && can_acquire_read_lock) {
// Nothing to do here. The Condvar will wake up all the waiters left in `new_waiters`.
return;
}
if (oldstate & SPINLOCK) == 0
&& self
.state
.compare_exchange_weak(
oldstate,
oldstate | SPINLOCK | HAS_WAITERS,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let mut transferred_writer = false;
// Safe because the spin lock guarantees exclusive access and the reference does not
// escape this function.
let waiters = unsafe { &mut *self.waiters.get() };
let mut current = new_waiters.front_mut();
while let Some(w) = current.get() {
match w.kind() {
WaiterKind::Shared => {
if can_acquire_read_lock {
current.move_next();
} else {
// We need to update the cancellation function since we're moving this
// waiter into our queue. Also update the waiting to indicate that it is
// now in the Mutex's waiter list.
let w = current.remove().unwrap();
w.set_cancel(cancel_waiter, self as *const RawMutex as usize);
w.set_waiting_for(WaitingFor::Mutex);
waiters.push_back(w);
}
}
WaiterKind::Exclusive => {
transferred_writer = true;
// We need to update the cancellation function since we're moving this
// waiter into our queue. Also update the waiting to indicate that it is
// now in the Mutex's waiter list.
let w = current.remove().unwrap();
w.set_cancel(cancel_waiter, self as *const RawMutex as usize);
w.set_waiting_for(WaitingFor::Mutex);
waiters.push_back(w);
}
}
}
let set_on_release = if transferred_writer {
WRITER_WAITING
} else {
0
};
// If we didn't actually transfer any waiters, clear the HAS_WAITERS bit that we set
// earlier when we acquired the spin lock.
let clear = if waiters.is_empty() {
SPINLOCK | HAS_WAITERS
} else {
SPINLOCK
};
while self
.state
.compare_exchange_weak(
oldstate,
(oldstate | set_on_release) & !clear,
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
spin_loop_hint();
oldstate = self.state.load(Ordering::Relaxed);
}
}
// The Condvar will wake up any waiters still left in the queue.
}
fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) -> bool {
let mut oldstate = self.state.load(Ordering::Relaxed);
while oldstate & SPINLOCK != 0
|| self
.state
.compare_exchange_weak(
oldstate,
oldstate | SPINLOCK,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
spin_loop_hint();
oldstate = self.state.load(Ordering::Relaxed);
}
// Safe because the spin lock provides exclusive access and the reference does not escape
// this function.
let waiters = unsafe { &mut *self.waiters.get() };
let mut clear = SPINLOCK;
// If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
// bit. Also clear the bit if we are going to be waking some other waiters. In this case the
// waiter that set the bit may have already been removed from the waiter list (and could be
// the one that is currently being dropped). If it is still in the waiter list then clearing
// this bit may starve it for one more iteration through the lock_slow() loop, whereas not
// clearing this bit could cause a deadlock if the waiter that set it is the one that is
// being dropped.
if wake_next
|| waiters
.front()
.get()
.map(|front| front as *const Waiter == waiter as *const Waiter)
.unwrap_or(false)
{
clear |= LONG_WAIT;
}
// Don't drop the old waiter while holding the spin lock.
let old_waiter = if waiter.is_linked() && waiter.is_waiting_for() == WaitingFor::Mutex {
// We know that the waitir is still linked and is waiting for the mutex, which
// guarantees that it is still linked into `self.waiters`.
let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
cursor.remove()
} else {
None
};
let (wake_list, set_on_release) =
if wake_next || waiter.is_waiting_for() == WaitingFor::None {
// Either the waiter was already woken or it's been removed from the mutex's waiter
// list and is going to be woken. Either way, we need to wake up another thread.
get_wake_list(waiters)
} else {
(WaiterList::new(WaiterAdapter::new()), 0)
};
if waiters.is_empty() {
clear |= HAS_WAITERS;
}
if wake_list.is_empty() {
// We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
// case this leads to an additional thread being woken up but we risk a deadlock if we
// don't clear it.
clear |= DESIGNATED_WAKER;
}
if let WaiterKind::Exclusive = waiter.kind() {
// The waiter being dropped is a writer so clear the writer waiting bit for now. If we
// found more writers in the list while fetching waiters to wake up then this bit will
// be set again via `set_on_release`.
clear |= WRITER_WAITING;
}
while self
.state
.compare_exchange_weak(
oldstate,
(oldstate & !clear) | set_on_release,
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
spin_loop_hint();
oldstate = self.state.load(Ordering::Relaxed);
}
for w in wake_list {
w.wake();
}
mem::drop(old_waiter);
// Canceling a waker is always successful.
true
}
}
unsafe impl Send for RawMutex {}
unsafe impl Sync for RawMutex {}
fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) -> bool {
let raw_mutex = raw as *const RawMutex;
// Safe because the thread that owns the waiter that is being canceled must
// also own a reference to the mutex, which ensures that this pointer is
// valid.
unsafe { (*raw_mutex).cancel_waiter(waiter, wake_next) }
}
/// A high-level primitive that provides safe, mutable access to a shared resource.
///
/// Unlike more traditional mutexes, `Mutex` can safely provide both shared, immutable access (via
/// `read_lock()`) as well as exclusive, mutable access (via `lock()`) to an underlying resource
/// with no loss of performance.
///
/// # Poisoning
///
/// `Mutex` does not support lock poisoning so if a thread panics while holding the lock, the
/// poisoned data will be accessible by other threads in your program. If you need to guarantee that
/// other threads cannot access poisoned data then you may wish to wrap this `Mutex` inside another
/// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
/// example of this.
///
///
/// # Fairness
///
/// This `Mutex` implementation does not guarantee that threads will acquire the lock in the same
/// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
/// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
/// threads will fail to acquire the lock until the starved thread has acquired it.
///
/// Similarly, this `Mutex` will attempt to balance reader and writer threads: once there is a
/// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
/// However, any reader threads that were already waiting will still be allowed to acquire it.
///
/// # Examples
///
/// ```edition2018
/// use std::sync::Arc;
/// use std::thread;
/// use std::sync::mpsc::channel;
///
/// use libchromeos::sync::{block_on, Mutex};
///
/// const N: usize = 10;
///
/// // Spawn a few threads to increment a shared variable (non-atomically), and
/// // let the main thread know once all increments are done.
/// //
/// // Here we're using an Arc to share memory among threads, and the data inside
/// // the Arc is protected with a mutex.
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
/// for _ in 0..N {
/// let (data, tx) = (Arc::clone(&data), tx.clone());
/// thread::spawn(move || {
/// // The shared state can only be accessed once the lock is held.
/// // Our non-atomic increment is safe because we're the only thread
/// // which can access the shared state when the lock is held.
/// let mut data = block_on(data.lock());
/// *data += 1;
/// if *data == N {
/// tx.send(()).unwrap();
/// }
/// // the lock is unlocked here when `data` goes out of scope.
/// });
/// }
///
/// rx.recv().unwrap();
/// ```
pub struct Mutex<T: ?Sized> {
raw: RawMutex,
value: UnsafeCell<T>,
}
impl<T> Mutex<T> {
/// Create a new, unlocked `Mutex` ready for use.
pub fn new(v: T) -> Mutex<T> {
Mutex {
raw: RawMutex::new(),
value: UnsafeCell::new(v),
}
}
/// Consume the `Mutex` and return the contained value. This method does not perform any locking
/// as the compiler will guarantee that there are no other references to `self` and the caller
/// owns the `Mutex`.
pub fn into_inner(self) -> T {
// Don't need to acquire the lock because the compiler guarantees that there are
// no references to `self`.
self.value.into_inner()
}
}
impl<T: ?Sized> Mutex<T> {
/// Acquires exclusive, mutable access to the resource protected by the `Mutex`, blocking the
/// current thread until it is able to do so. Upon returning, the current thread will be the
/// only thread with access to the resource. The `Mutex` will be released when the returned
/// `MutexGuard` is dropped.
///
/// Calling `lock()` while holding a `MutexGuard` or a `MutexReadGuard` will cause a deadlock.
///
/// Callers that are not in an async context may wish to use the `block_on` method to block the
/// thread until the `Mutex` is acquired.
#[inline]
pub async fn lock(&self) -> MutexGuard<'_, T> {
self.raw.lock().await;
// Safe because we have exclusive access to `self.value`.
MutexGuard {
mu: self,
value: unsafe { &mut *self.value.get() },
}
}
/// Acquires shared, immutable access to the resource protected by the `Mutex`, blocking the
/// current thread until it is able to do so. Upon returning there may be other threads that
/// also have immutable access to the resource but there will not be any threads that have
/// mutable access to the resource. When the returned `MutexReadGuard` is dropped the thread
/// releases its access to the resource.
///
/// Calling `read_lock()` while holding a `MutexReadGuard` may deadlock. Calling `read_lock()`
/// while holding a `MutexGuard` will deadlock.
///
/// Callers that are not in an async context may wish to use the `block_on` method to block the
/// thread until the `Mutex` is acquired.
#[inline]
pub async fn read_lock(&self) -> MutexReadGuard<'_, T> {
self.raw.read_lock().await;
// Safe because we have shared read-only access to `self.value`.
MutexReadGuard {
mu: self,
value: unsafe { &*self.value.get() },
}
}
// Called from `Condvar::wait` when the thread wants to reacquire the lock. Since we may
// directly transfer waiters from the `Condvar` wait list to the `Mutex` wait list (see
// `transfer_all` below), we cannot call `Mutex::lock` as we also need to clear the
// `DESIGNATED_WAKER` bit when acquiring the lock. Not doing so will prevent us from waking up
// any other threads in the wait list.
#[inline]
pub(crate) async fn lock_from_cv(&self) -> MutexGuard<'_, T> {
self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
// Safe because we have exclusive access to `self.value`.
MutexGuard {
mu: self,
value: unsafe { &mut *self.value.get() },
}
}
// Like `lock_from_cv` but for acquiring a shared lock.
#[inline]
pub(crate) async fn read_lock_from_cv(&self) -> MutexReadGuard<'_, T> {
// Threads that have waited in the Condvar's waiter list don't have to care if there is a
// writer waiting. This also prevents a deadlock in the following case:
//
// * Thread A holds a write lock.
// * Thread B is in the mutex's waiter list, also waiting on a write lock.
// * Threads C, D, and E are in the condvar's waiter list. C and D want a read lock; E
// wants a write lock.
// * A calls `cv.notify_all()` while still holding the lock, which transfers C, D, and E
// onto the mutex's wait list.
// * A releases the lock, which wakes up B.
// * B acquires the lock, does some work, and releases the lock. This wakes up C and D.
// However, when iterating through the waiter list we find E, which is waiting for a
// write lock so we set the WRITER_WAITING bit.
// * C and D go through this function to acquire the lock. If we didn't clear the
// WRITER_WAITING bit from the zero_to_acquire set then it would prevent C and D from
// acquiring the lock and they would add themselves back into the waiter list.
// * Now C, D, and E will sit in the waiter list indefinitely unless some other thread
// comes along and acquires the lock. On release, it would wake up E and everything would
// go back to normal.
self.raw
.lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
.await;
// Safe because we have exclusive access to `self.value`.
MutexReadGuard {
mu: self,
value: unsafe { &*self.value.get() },
}
}
#[inline]
fn unlock(&self) {
self.raw.unlock();
}
#[inline]
fn read_unlock(&self) {
self.raw.read_unlock();
}
pub fn get_mut(&mut self) -> &mut T {
// Safe because the compiler statically guarantees that are no other references to `self`.
// This is also why we don't need to acquire the lock first.
unsafe { &mut *self.value.get() }
}
}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
impl<T: ?Sized + Default> Default for Mutex<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T> From<T> for Mutex<T> {
fn from(source: T) -> Self {
Self::new(source)
}
}
/// An RAII implementation of a "scoped exclusive lock" for a `Mutex`. When this structure is
/// dropped, the lock will be released. The resource protected by the `Mutex` can be accessed via
/// the `Deref` and `DerefMut` implementations of this structure.
pub struct MutexGuard<'a, T: ?Sized + 'a> {
mu: &'a Mutex<T>,
value: &'a mut T,
}
impl<'a, T: ?Sized> MutexGuard<'a, T> {
pub(crate) fn into_inner(self) -> &'a Mutex<T> {
self.mu
}
pub(crate) fn as_raw_mutex(&self) -> &RawMutex {
&self.mu.raw
}
}
impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value
}
}
impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mu.unlock()
}
}
/// An RAII implementation of a "scoped shared lock" for a `Mutex`. When this structure is dropped,
/// the lock will be released. The resource protected by the `Mutex` can be accessed via the `Deref`
/// implementation of this structure.
pub struct MutexReadGuard<'a, T: ?Sized + 'a> {
mu: &'a Mutex<T>,
value: &'a T,
}
impl<'a, T: ?Sized> MutexReadGuard<'a, T> {
pub(crate) fn into_inner(self) -> &'a Mutex<T> {
self.mu
}
pub(crate) fn as_raw_mutex(&self) -> &RawMutex {
&self.mu.raw
}
}
impl<'a, T: ?Sized> Deref for MutexReadGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value
}
}
impl<'a, T: ?Sized> Drop for MutexReadGuard<'a, T> {
fn drop(&mut self) {
self.mu.read_unlock()
}
}
#[cfg(test)]
mod test {
use super::*;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::Duration;
use futures::channel::oneshot;
use futures::task::{waker_ref, ArcWake};
use futures::{pending, select, FutureExt};
use futures_executor::{LocalPool, ThreadPool};
use futures_util::task::LocalSpawnExt;
use crate::sync::{block_on, Condvar, SpinLock};
#[derive(Debug, Eq, PartialEq)]
struct NonCopy(u32);
// Dummy waker used when we want to manually drive futures.
struct TestWaker;
impl ArcWake for TestWaker {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
#[test]
fn it_works() {
let mu = Mutex::new(NonCopy(13));
assert_eq!(*block_on(mu.lock()), NonCopy(13));
}
#[test]
fn smoke() {
let mu = Mutex::new(NonCopy(7));
mem::drop(block_on(mu.lock()));
mem::drop(block_on(mu.lock()));
}
#[test]
fn rw_smoke() {
let mu = Mutex::new(NonCopy(7));
mem::drop(block_on(mu.lock()));
mem::drop(block_on(mu.read_lock()));
mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
mem::drop(block_on(mu.lock()));
}
#[test]
fn async_smoke() {
async fn lock(mu: Rc<Mutex<NonCopy>>) {
mu.lock().await;
}
async fn read_lock(mu: Rc<Mutex<NonCopy>>) {
mu.read_lock().await;
}
async fn double_read_lock(mu: Rc<Mutex<NonCopy>>) {
let first = mu.read_lock().await;
mu.read_lock().await;
// Make sure first lives past the second read lock.
first.as_raw_mutex();
}
let mu = Rc::new(Mutex::new(NonCopy(7)));
let mut ex = LocalPool::new();
let spawner = ex.spawner();
spawner
.spawn_local(lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(read_lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(double_read_lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
spawner
.spawn_local(lock(Rc::clone(&mu)))
.expect("Failed to spawn future");
ex.run();
}
#[test]
fn send() {
let mu = Mutex::new(NonCopy(19));
thread::spawn(move || {
let value = block_on(mu.lock());
assert_eq!(*value, NonCopy(19));
})
.join()
.unwrap();
}
#[test]
fn arc_nested() {
// Tests nested mutexes and access to underlying data.
let mu = Mutex::new(1);
let arc = Arc::new(Mutex::new(mu));
thread::spawn(move || {
let nested = block_on(arc.lock());
let lock2 = block_on(nested.lock());
assert_eq!(*lock2, 1);
})
.join()
.unwrap();
}
#[test]
fn arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1));
let arc2 = arc.clone();
thread::spawn(move || {
struct Unwinder {
i: Arc<Mutex<i32>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*block_on(self.i.lock()) += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
})
.join()
.expect_err("thread did not panic");
let lock = block_on(arc.lock());
assert_eq!(*lock, 2);
}
#[test]
fn unsized_value() {
let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]);
{
let b = &mut *block_on(mutex.lock());
b[0] = 4;
b[2] = 5;
}
let expected: &[i32] = &[4, 2, 5];
assert_eq!(&*block_on(mutex.lock()), expected);
}
#[test]
fn high_contention() {
const THREADS: usize = 17;
const ITERATIONS: usize = 103;
let mut threads = Vec::with_capacity(THREADS);
let mu = Arc::new(Mutex::new(0usize));
for _ in 0..THREADS {
let mu2 = mu.clone();
threads.push(thread::spawn(move || {
for _ in 0..ITERATIONS {
*block_on(mu2.lock()) += 1;
}
}));
}
for t in threads.into_iter() {
t.join().unwrap();
}
assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn high_contention_with_cancel() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
async fn increment(mu: Arc<Mutex<usize>>, alt_mu: Arc<Mutex<usize>>, tx: Sender<()>) {
for _ in 0..ITERATIONS {
select! {
mut count = mu.lock().fuse() => *count += 1,
mut count = alt_mu.lock().fuse() => *count += 1,
}
}
tx.send(()).expect("Failed to send completion signal");
}
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let mu = Arc::new(Mutex::new(0usize));
let alt_mu = Arc::new(Mutex::new(0usize));
let (tx, rx) = channel();
for _ in 0..TASKS {
ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
}
for _ in 0..TASKS {
if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
panic!("Error while waiting for threads to complete: {}", e);
}
}
assert_eq!(
*block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
TASKS * ITERATIONS
);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn single_thread_async() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
// Async closures are unstable.
async fn increment(mu: Rc<Mutex<usize>>) {
for _ in 0..ITERATIONS {
*mu.lock().await += 1;
}
}
let mut ex = LocalPool::new();
let spawner = ex.spawner();
let mu = Rc::new(Mutex::new(0usize));
for _ in 0..TASKS {
spawner
.spawn_local(increment(Rc::clone(&mu)))
.expect("Failed to spawn task");
}
ex.run();
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn multi_thread_async() {
const TASKS: usize = 17;
const ITERATIONS: usize = 103;
// Async closures are unstable.
async fn increment(mu: Arc<Mutex<usize>>, tx: Sender<()>) {
for _ in 0..ITERATIONS {
*mu.lock().await += 1;
}
tx.send(()).expect("Failed to send completion signal");
}
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let mu = Arc::new(Mutex::new(0usize));
let (tx, rx) = channel();
for _ in 0..TASKS {
ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
}
for _ in 0..TASKS {
rx.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion signal");
}
assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn get_mut() {
let mut mu = Mutex::new(NonCopy(13));
*mu.get_mut() = NonCopy(17);
assert_eq!(mu.into_inner(), NonCopy(17));
}
#[test]
fn into_inner() {
let mu = Mutex::new(NonCopy(29));
assert_eq!(mu.into_inner(), NonCopy(29));
}
#[test]
fn into_inner_drop() {
struct NeedsDrop(Arc<AtomicUsize>);
impl Drop for NeedsDrop {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::AcqRel);
}
}
let value = Arc::new(AtomicUsize::new(0));
let needs_drop = Mutex::new(NeedsDrop(value.clone()));
assert_eq!(value.load(Ordering::Acquire), 0);
{
let inner = needs_drop.into_inner();
assert_eq!(inner.0.load(Ordering::Acquire), 0);
}
assert_eq!(value.load(Ordering::Acquire), 1);
}
#[test]
fn rw_arc() {
const THREADS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Arc::new(Mutex::new(0isize));
let mu2 = mu.clone();
let (tx, rx) = channel();
thread::spawn(move || {
let mut guard = block_on(mu2.lock());
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
thread::yield_now();
*guard = tmp + 1;
}
tx.send(()).unwrap();
});
let mut readers = Vec::with_capacity(10);
for _ in 0..THREADS {
let mu3 = mu.clone();
let handle = thread::spawn(move || {
let guard = block_on(mu3.read_lock());
assert!(*guard >= 0);
});
readers.push(handle);
}
// Wait for the readers to finish their checks.
for r in readers {
r.join().expect("One or more readers saw a negative value");
}
// Wait for the writer to finish.
rx.recv_timeout(Duration::from_secs(5)).unwrap();
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn rw_single_thread_async() {
// A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
// time after that.
struct TestFuture {
polled: bool,
waker: Arc<SpinLock<Option<Waker>>>,
}
impl Future for TestFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.polled {
Poll::Ready(())
} else {
self.polled = true;
*self.waker.lock() = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
loop {
if let Some(waker) = waker.lock().take() {
waker.wake();
return;
} else {
thread::sleep(Duration::from_millis(10));
}
}
}
async fn writer(mu: Rc<Mutex<isize>>) {
let mut guard = mu.lock().await;
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
let waker = Arc::new(SpinLock::new(None));
let waker2 = Arc::clone(&waker);
thread::spawn(move || wake_future(waker2));
let fut = TestFuture {
polled: false,
waker,
};
fut.await;
*guard = tmp + 1;
}
}
async fn reader(mu: Rc<Mutex<isize>>) {
let guard = mu.read_lock().await;
assert!(*guard >= 0);
}
const TASKS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Rc::new(Mutex::new(0isize));
let mut ex = LocalPool::new();
let spawner = ex.spawner();
spawner
.spawn_local(writer(Rc::clone(&mu)))
.expect("Failed to spawn writer");
for _ in 0..TASKS {
spawner
.spawn_local(reader(Rc::clone(&mu)))
.expect("Failed to spawn reader");
}
ex.run();
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn rw_multi_thread_async() {
async fn writer(mu: Arc<Mutex<isize>>, tx: Sender<()>) {
let mut guard = mu.lock().await;
for _ in 0..ITERATIONS {
let tmp = *guard;
*guard = -1;
thread::yield_now();
*guard = tmp + 1;
}
mem::drop(guard);
tx.send(()).unwrap();
}
async fn reader(mu: Arc<Mutex<isize>>, tx: Sender<()>) {
let guard = mu.read_lock().await;
assert!(*guard >= 0);
mem::drop(guard);
tx.send(()).expect("Failed to send completion message");
}
const TASKS: isize = 7;
const ITERATIONS: isize = 13;
let mu = Arc::new(Mutex::new(0isize));
let ex = ThreadPool::new().expect("Failed to create ThreadPool");
let (txw, rxw) = channel();
ex.spawn_ok(writer(Arc::clone(&mu), txw));
let (txr, rxr) = channel();
for _ in 0..TASKS {
ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
}
// Wait for the readers to finish their checks.
for _ in 0..TASKS {
rxr.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion message from reader");
}
// Wait for the writer to finish.
rxw.recv_timeout(Duration::from_secs(5))
.expect("Failed to receive completion message from writer");
assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn wake_all_readers() {
async fn read(mu: Arc<Mutex<()>>) {
let g = mu.read_lock().await;
pending!();
mem::drop(g);
}
async fn write(mu: Arc<Mutex<()>>) {
mu.lock().await;
}
let mu = Arc::new(Mutex::new(()));
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
Box::pin(read(mu.clone())),
Box::pin(read(mu.clone())),
Box::pin(read(mu.clone())),
Box::pin(write(mu.clone())),
Box::pin(read(mu.clone())),
];
const NUM_READERS: usize = 4;
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
// Acquire the lock so that the futures cannot get it.
let g = block_on(mu.lock());
for r in &mut futures {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
// Drop the lock. This should allow all readers to make progress. Since they already waited
// once they should ignore the WRITER_WAITING bit that is currently set.
mem::drop(g);
for r in &mut futures {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
// Check that all readers were able to acquire the lock.
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
READ_LOCK * NUM_READERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
let mut needs_poll = None;
// All the readers can now finish but the writer needs to be polled again.
for (i, r) in futures.iter_mut().enumerate() {
match r.as_mut().poll(&mut cx) {
Poll::Ready(()) => {}
Poll::Pending => {
if needs_poll.is_some() {
panic!("More than one future unable to complete");
}
needs_poll = Some(i);
}
}
}
if let Poll::Pending = futures[needs_poll.expect("Writer unexpectedly able to complete")]
.as_mut()
.poll(&mut cx)
{
panic!("Writer unable to complete");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn long_wait() {
async fn tight_loop(mu: Arc<Mutex<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<Mutex<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(Mutex::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
// This time the tight loop will fail to acquire the lock.
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
// Which will finally allow the mark_ready function to make progress.
if let Poll::Pending = mark.as_mut().poll(&mut cx) {
panic!("mark_ready not able to make progress");
}
// Now the tight loop will finish.
if let Poll::Pending = tl.as_mut().poll(&mut cx) {
panic!("tight_loop not able to finish");
}
assert!(*block_on(mu.lock()));
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_long_wait_before_wake() {
async fn tight_loop(mu: Arc<Mutex<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<Mutex<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(Mutex::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
// Now drop the mark_ready future, which should clear the LONG_WAIT bit.
mem::drop(mark);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
mem::drop(tl);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_long_wait_after_wake() {
async fn tight_loop(mu: Arc<Mutex<bool>>) {
loop {
let ready = mu.lock().await;
if *ready {
break;
}
pending!();
}
}
async fn mark_ready(mu: Arc<Mutex<bool>>) {
*mu.lock().await = true;
}
let mu = Arc::new(Mutex::new(false));
let mut tl = Box::pin(tight_loop(mu.clone()));
let mut mark = Box::pin(mark_ready(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
for _ in 0..=LONG_WAIT_THRESHOLD {
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
panic!("mark_ready unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
);
// This time the tight loop will fail to acquire the lock.
if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
panic!("tight_loop unexpectedly ready");
}
// Now drop the mark_ready future, which should clear the LONG_WAIT bit.
mem::drop(mark);
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
// Since the lock is not held, we should be able to spawn a future to set the ready flag.
block_on(mark_ready(mu.clone()));
// Now the tight loop will finish.
if let Poll::Pending = tl.as_mut().poll(&mut cx) {
panic!("tight_loop not able to finish");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn designated_waker() {
async fn inc(mu: Arc<Mutex<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(Mutex::new(0));
let mut futures = [
Box::pin(inc(mu.clone())),
Box::pin(inc(mu.clone())),
Box::pin(inc(mu.clone())),
];
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
// Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
// the waiter list.
if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
LOCKED | HAS_WAITERS | WRITER_WAITING,
);
// Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
// in the wait list.
mem::drop(count);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
);
// Now poll the third future. It should be able to acquire the lock immediately.
if let Poll::Pending = futures[2].as_mut().poll(&mut cx) {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 1);
// There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
// be set.
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
DESIGNATED_WAKER
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
// Now let the future that was woken up run.
if let Poll::Pending = futures[0].as_mut().poll(&mut cx) {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 2);
if let Poll::Pending = futures[1].as_mut().poll(&mut cx) {
panic!("future unable to complete");
}
assert_eq!(*block_on(mu.lock()), 3);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_designated_waker() {
async fn inc(mu: Arc<Mutex<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(Mutex::new(0));
let mut fut = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let count = block_on(mu.lock());
if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
panic!("Future unexpectedly ready when lock is held");
}
// Drop the lock. This will wake up the future.
mem::drop(count);
// Now drop the future without polling. This should clear all the state in the mutex.
mem::drop(fut);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_before_wake() {
async fn inc(mu: Arc<Mutex<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(Mutex::new(0));
let mut fut1 = Box::pin(inc(mu.clone()));
let mut fut2 = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
// First acquire the lock.
let count = block_on(mu.lock());
// Now poll the futures. Since the lock is acquired they will both get queued in the waiter
// list.
match fut1.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
// Drop fut1. This should remove it from the waiter list but shouldn't wake fut2.
mem::drop(fut1);
// There should be no designated waker.
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
// Since the waiter was a writer, we should clear the WRITER_WAITING bit.
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
// Now drop the lock. This should mark fut2 as ready to make progress.
mem::drop(count);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => panic!("Future is not ready to make progress"),
Poll::Ready(()) => {}
}
// Verify that we only incremented the count once.
assert_eq!(*block_on(mu.lock()), 1);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn cancel_after_wake() {
async fn inc(mu: Arc<Mutex<usize>>) {
*mu.lock().await += 1;
}
let mu = Arc::new(Mutex::new(0));
let mut fut1 = Box::pin(inc(mu.clone()));
let mut fut2 = Box::pin(inc(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
// First acquire the lock.
let count = block_on(mu.lock());
// Now poll the futures. Since the lock is acquired they will both get queued in the waiter
// list.
match fut1.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("Future is unexpectedly ready"),
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
// Drop the lock. This should mark fut1 as ready to make progress.
mem::drop(count);
// Now drop fut1. This should make fut2 ready to make progress.
mem::drop(fut1);
// Since there was still another waiter in the list we shouldn't have cleared the
// DESIGNATED_WAKER bit.
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
DESIGNATED_WAKER
);
// Since the waiter was a writer, we should clear the WRITER_WAITING bit.
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
match fut2.as_mut().poll(&mut cx) {
Poll::Pending => panic!("Future is not ready to make progress"),
Poll::Ready(()) => {}
}
// Verify that we only incremented the count once.
assert_eq!(*block_on(mu.lock()), 1);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn timeout() {
async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<Mutex<()>>) {
select! {
res = timer.fuse() => {
match res {
Ok(()) => {},
Err(e) => panic!("Timer unexpectedly canceled: {}", e),
}
}
_ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
}
}
let mu = Arc::new(Mutex::new(()));
let (tx, rx) = oneshot::channel();
let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
// Acquire the lock.
let g = block_on(mu.lock());
// Poll the future.
if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
panic!("timed_lock unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
// Signal the channel, which should cancel the lock.
tx.send(()).expect("Failed to send wakeup");
// Now the future should have completed without acquiring the lock.
if let Poll::Pending = timeout.as_mut().poll(&mut cx) {
panic!("timed_lock not ready after timeout");
}
// The mutex state should not show any waiters.
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
mem::drop(g);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn writer_waiting() {
async fn read_zero(mu: Arc<Mutex<usize>>) {
let val = mu.read_lock().await;
pending!();
assert_eq!(*val, 0);
}
async fn inc(mu: Arc<Mutex<usize>>) {
*mu.lock().await += 1;
}
async fn read_one(mu: Arc<Mutex<usize>>) {
let val = mu.read_lock().await;
assert_eq!(*val, 1);
}
let mu = Arc::new(Mutex::new(0));
let mut r1 = Box::pin(read_zero(mu.clone()));
let mut r2 = Box::pin(read_zero(mu.clone()));
let mut w = Box::pin(inc(mu.clone()));
let mut r3 = Box::pin(read_one(mu.clone()));
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
panic!("read_zero unexpectedly ready");
}
if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
panic!("read_zero unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2 * READ_LOCK
);
if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
panic!("inc unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
// The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
panic!("read_one unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2 * READ_LOCK
);
if let Poll::Pending = r1.as_mut().poll(&mut cx) {
panic!("read_zero unable to complete");
}
if let Poll::Pending = r2.as_mut().poll(&mut cx) {
panic!("read_zero unable to complete");
}
if let Poll::Pending = w.as_mut().poll(&mut cx) {
panic!("inc unable to complete");
}
if let Poll::Pending = r3.as_mut().poll(&mut cx) {
panic!("read_one unable to complete");
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn transfer_notify_one() {
async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
}
async fn write(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
let mu = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut readers = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
let mut writer = Box::pin(write(mu.clone(), cv.clone()));
for r in &mut readers {
if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
panic!("reader unexpectedly ready");
}
}
if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
panic!("writer unexpectedly ready");
}
let mut count = block_on(mu.lock());
*count = 1;
// This should transfer all readers + one writer to the waiter queue.
cv.notify_one();
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
HAS_WAITERS
);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
WRITER_WAITING
);
mem::drop(count);
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
HAS_WAITERS | WRITER_WAITING
);
for r in &mut readers {
if let Poll::Pending = r.as_mut().poll(&mut cx) {
panic!("reader unable to complete");
}
}
if let Poll::Pending = writer.as_mut().poll(&mut cx) {
panic!("writer unable to complete");
}
assert_eq!(*block_on(mu.read_lock()), 0);
}
#[test]
fn transfer_waiters_when_unlocked() {
async fn dec(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
let mu = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures = [
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
Box::pin(dec(mu.clone(), cv.clone())),
];
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
*block_on(mu.lock()) = futures.len();
cv.notify_all();
// Since the lock is not held, instead of transferring the waiters to the waiter list we
// should just wake them all up.
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
for f in &mut futures {
if let Poll::Pending = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn transfer_reader_writer() {
async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
// Yield once while holding the read lock, which should prevent the writer from waking
// up.
pending!();
}
async fn write(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.lock().await;
while *count == 0 {
count = cv.wait(count).await;
}
*count -= 1;
}
async fn lock(mu: Arc<Mutex<usize>>) {
mem::drop(mu.lock().await);
}
let mu = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(write(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
const NUM_READERS: usize = 4;
let mut l = Box::pin(lock(mu.clone()));
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
let mut count = block_on(mu.lock());
*count = 1;
// Now poll the lock function. Since the lock is held by us, it will get queued on the
// waiter list.
if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
panic!("lock() unexpectedly ready");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
HAS_WAITERS | WRITER_WAITING
);
// Wake up waiters while holding the lock. This should end with them transferred to the
// mutex's waiter list.
cv.notify_all();
// Drop the lock. This should wake up the lock function.
mem::drop(count);
if let Poll::Pending = l.as_mut().poll(&mut cx) {
panic!("lock() unable to complete");
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
HAS_WAITERS | WRITER_WAITING
);
// Poll everything again. The readers should be able to make progress (but not complete) but
// the writer should be blocked.
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
READ_LOCK * NUM_READERS
);
// All the readers can now finish but the writer needs to be polled again.
let mut needs_poll = None;
for (i, r) in futures.iter_mut().enumerate() {
match r.as_mut().poll(&mut cx) {
Poll::Ready(()) => {}
Poll::Pending => {
if needs_poll.is_some() {
panic!("More than one future unable to complete");
}
needs_poll = Some(i);
}
}
}
if let Poll::Pending = futures[needs_poll.expect("Writer unexpectedly able to complete")]
.as_mut()
.poll(&mut cx)
{
panic!("Writer unable to complete");
}
assert_eq!(*block_on(mu.lock()), 0);
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
#[test]
fn transfer_readers_with_read_lock() {
async fn read(mu: Arc<Mutex<usize>>, cv: Arc<Condvar>) {
let mut count = mu.read_lock().await;
while *count == 0 {
count = cv.wait_read(count).await;
}
// Yield once while holding the read lock.
pending!();
}
let mu = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());
let arc_waker = Arc::new(TestWaker);
let waker = waker_ref(&arc_waker);
let mut cx = Context::from_waker(&waker);
let mut futures = [
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
Box::pin(read(mu.clone(), cv.clone())),
];
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
// Increment the count and then grab a read lock.
*block_on(mu.lock()) = 1;
let g = block_on(mu.read_lock());
// Notify the condvar while holding the read lock. This should wake up all the waiters
// rather than just transferring them.
cv.notify_all();
assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
mem::drop(g);
for f in &mut futures {
if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
panic!("future unexpectedly ready");
}
}
assert_eq!(
mu.raw.state.load(Ordering::Relaxed),
READ_LOCK * futures.len()
);
for f in &mut futures {
if let Poll::Pending = f.as_mut().poll(&mut cx) {
panic!("future unable to complete");
}
}
assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
}
}