Home > Back-end >  What happens when a thread that calls Condvar::wait(condvar, some_mutex) is spuriously woken?
What happens when a thread that calls Condvar::wait(condvar, some_mutex) is spuriously woken?

Time:06-10

If a thread is sleeping, waiting on a notify_one / notify_all call but before this occurs the thread is woken, does the call to wait return or does the thread continue to wait?

I've read the docs (for Condvar::wait) and its unclear to me if it means that a call to wait will return as soon as a thread is woken:

Note that this function is susceptible to spurious wakeups. Condition variables normally have a boolean predicate associated with them, and the predicate must always be checked each time this function returns to protect against spurious wakeups.

If it is the case that the call to wait returns as soon as the thread is woken, then what is returned? Since it must return some kind of guard:

pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;

Specific example, in particular, the recv method:

pub struct Receiver<T> {
    shared: Arc<Shared<T>>
}

impl<T> Receiver<T> {
    pub fn recv(&self) -> Option<T> {
        let mut inner_mutex_guard:MutexGuard<Inner<T>> = self.shared.inner.lock().unwrap();
        loop {
        
            match inner_mutex_guard.queue.pop_front() {
                Some(t) => return Some(t),
                None => {
                    if inner_mutex_guard.senders == 0 {
                        return None
                    } else {
                        inner_mutex_guard = Condvar::wait(&self.shared.available, inner_mutex_guard).unwrap();
                    }
                }
            }
        }
    }
}
 
pub struct Inner<T> {
    queue: VecDeque<T>,
    senders: usize
}  

pub struct Shared<T> {
    inner: Mutex<Inner<T>>,
    available: Condvar,
}

CodePudding user response:

Your example is correct. Although I would write

inner_mutex_guard = self.shared.available.wait(inner_mutex_guard).unwrap();

instead of

inner_mutex_guard = Condvar::wait(&self.shared.available, inner_mutex_guard).unwrap();

If a spurious wakeup happens, your loop will call pop_front() again, which will potentially return None and enter another .wait().

I understand that you probably got confused by Condition variables normally have a boolean predicate associated with them. That's because your boolean predicate is a little hidden ... it is "Does .pop_front() return Some". Which you do check, as required, whenever a wakeup happens.

The return type of .wait() is another LockGuard. In order for something to happen, someone else must have the possibility to change the locked value. Therefore, the value must be unlocked while waiting.

As there are several easy-to-miss race conditions in the implementation of unlock-wait-lock, it's usually done in a single call. So .wait() unlocks the value, waits for the condition to happen, and then locks the value again. That's why it returns a new LockGuard ... it is the new locked and potentially changed value.

Although to be honest I'm not sure why they did it that way ... they could as well have done .wait(&mut LockGuard) instead of .wait(LockGuard) -> LockGuard. But who knows.

Edit: I'm pretty sure they return a lock guard because the re-locking could fail; and therefore they don't actually return a LockGuard but a Result<LockGuard>

Edit #2: It cannot be passed in by reference, because during the wait, no LockGuard exists. The previous one was dropped, and the new one only exists when the data is locked again. And LockGuard does not have an "empty" state (like None), so it actually has to be consumed and dropped. Which isn't possible with a reference.


Explanation of CondVar

~ original answer before an example was added ~

You have to understand that a CondVar is more of an optimization. It prevents idle-waiting for a condition. That's why it takes a Guard, it's supposed to be combined with a locked value that you want to watch for changes. So if it wakes up, check if the value changed and then go back to wait. wait is meant to be called in a loop.

Here's an example:

use std::{
    sync::{Arc, Condvar, Mutex},
    thread::{sleep, JoinHandle},
    time::Duration,
};

struct Counter {
    value: Mutex<u32>,
    condvar: Condvar,
}

fn start_counting_thread(counter: Arc<Counter>) -> JoinHandle<()> {
    std::thread::spawn(move || loop {
        sleep(Duration::from_millis(100));

        let mut value = counter.value.lock().unwrap();
        *value  = 1;
        counter.condvar.notify_all();

        if *value > 15 {
            break;
        }
    })
}

fn main() {
    let counter = Arc::new(Counter {
        value: Mutex::new(0),
        condvar: Condvar::new(),
    });

    let counting_thread = start_counting_thread(counter.clone());

    // Wait until the value more than 10
    let mut value = counter.value.lock().unwrap();
    while *value <= 10 {
        println!("Value is {value}, waiting ...");
        value = counter.condvar.wait(value).unwrap();
    }
    println!("Condition met. Value is now {}.", *value);

    // Unlock value
    drop(value);

    // Wait for counting thread to finish
    counting_thread.join().unwrap();
}
Value is 0, waiting ...
Value is 1, waiting ...
Value is 2, waiting ...
Value is 3, waiting ...
Value is 4, waiting ...
Value is 5, waiting ...
Value is 6, waiting ...
Value is 7, waiting ...
Value is 8, waiting ...
Value is 9, waiting ...
Value is 10, waiting ...
Condition met. Value is now 11.

If you don't want to implement the loop manually but instead just wait until a condition is met, use wait_while:

use std::{
    sync::{Arc, Condvar, Mutex},
    thread::{sleep, JoinHandle},
    time::Duration,
};

struct Counter {
    value: Mutex<u32>,
    condvar: Condvar,
}

fn start_counting_thread(counter: Arc<Counter>) -> JoinHandle<()> {
    std::thread::spawn(move || loop {
        sleep(Duration::from_millis(100));

        let mut value = counter.value.lock().unwrap();
        *value  = 1;
        counter.condvar.notify_all();

        if *value > 15 {
            break;
        }
    })
}

fn main() {
    let counter = Arc::new(Counter {
        value: Mutex::new(0),
        condvar: Condvar::new(),
    });

    let counting_thread = start_counting_thread(counter.clone());

    // Wait until the value more than 10
    let mut value = counter.value.lock().unwrap();
    value = counter.condvar.wait_while(value, |val| *val <= 10).unwrap();
    println!("Condition met. Value is now {}.", *value);

    // Unlock value
    drop(value);

    // Wait for counting thread to finish
    counting_thread.join().unwrap();
}
Condition met. Value is now 11.
  • Related