My Rust code uses RwLock
to process data in multiple threads. Each thread fills a common storage while using the read
lock (e.g. filling up a database, but my case is a bit different). Eventually, the common storage will fill up. I need to pause all processing, reallocate storage space (e.g. allocate more disk space from cloud), and continue.
// psudo-code
fn thread_worker(tasks) {
let lock = rwlock.read().unwrap();
for task in tasks {
// please ignore out_of_space check race condition
// it's here just to explain the question
if out_of_space {
drop(lock);
let write_lock = rwlock.write().unwrap();
// get more storage
drop(write_lock);
lock = rwlock.read().unwrap();
}
// handle task WITHOUT getting a read lock on every pass
// getting a lock is far costlier than actual task processing
}
drop(lock);
}
Since all threads will quickly hit out of space at about the same time, they can all release the read
lock, and get a write
. The first thread that gets the write
lock will fix the storage issue. But now I have a possible temporary deadlock situation - all other threads are also waiting for the write
lock even though they no longer need it.
So it is possible for this situation to happen: given 3 threads all waiting for write
, the 1st gets the write
, fixes the issue, releases write
, and waits for read
. The 2nd enters write
but quickly skips because issue already fixed and releases. The 1st and 2nd threads will enter read
and continue processing, but the 3rd is still waiting for write
and will wait for it for a very long time until the first two either run out of space or finish all their work.
Given all threads waiting for write
, how can I "abort" all other thread's waits from the first thread after it finishes its work, but before it releases the write
lock it already got?
I saw there is a poisoning
feature, but that was designed for panics, and reusing it for production seems wrong and tricky to get done correctly. Also Rust devs are thinking of removing it.
P.S. Each loop iteration is essentially a data[index] = value
assignment, where data
is a giant memmap shared by many threads. The index
is slowly growing in all threads, so eventually all threads run out of memmap size. When that happens, memmap is destroyed, file reallocated, and a new memmap is created. Thus, it is impossible to get a read lock on every loop iteration.
CodePudding user response:
You could have an AtomicBool
that serves as gatekeeper for writing: only one thread gets to write()
at one time. The other threads that need to write give up as soon as they find that there's already a writer, and then they simply fall back to reading. So instead of "aborting" the other writers, you prevent them from initiating a write()
to begin with.
For example, assuming a base implementation that looks like this (playground):
use parking_lot::RwLock;
pub struct Data<T> {
store: RwLock<T>,
}
impl<T: Default> Data<T> {
pub fn process(&self, needed_size: usize, f: impl FnOnce(&T)) {
let mut store = self.store.read();
if Self::needs_resize(&store, needed_size) {
drop(store);
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
store = self.store.read();
}
f(&store)
}
fn needs_resize(_store: &T, _needed_size: usize) -> bool {
unimplemented!()
}
fn resize(store: &mut T, to_size: usize) {
if !Self::needs_resize(store, to_size) {
return; // someone else reserved enough for us
}
unimplemented!()
}
}
The implementation that avoids unnecessary writes might then look like this:
use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Data<T> {
store: RwLock<T>,
has_writer: AtomicBool,
}
impl<T: Default> Data<T> {
pub fn process(&self, needed_size: usize, f: impl FnOnce(&T)) {
loop {
let mut store = self.store.read();
if Self::needs_resize(&store, needed_size) {
drop(store);
if self.has_writer.swap(true, Ordering::SeqCst) {
continue; // someone else is writing, go back to read
}
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
self.has_writer.store(false, Ordering::SeqCst);
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
store = self.store.read();
}
break f(&store);
}
}
// needs_resize() and resize() the same stubs as before
...
}
CodePudding user response:
Assuming that the tasks are independent and relatively short, the easiest way to solve this is to not hold a read lock for the whole batch of work, but unlock it for each task:
// psudo-code
fn thread_worker(tasks) {
for task in tasks {
if out_of_space {
let write_lock = rwlock.write().unwrap();
// get more storage
...
}
// process task
let lock = rwlock.read().unwrap();
...
}
}
Note how this avoids explicit drops which is usually a good practice (RAII). The drop is automatic at the end of the usage block.
This still has this issue, but only for a very short period of time, especially if the number of tasks is much bigger than the number of workers.
To make it even more robust, you might get inspiration from std::sync::Barrier and implement something similar with a CondVar and a counter.
P.S. Consider an alternative design where instead of workers handling the reallocation, it is performed by a separate dedicated management task. When out_of_space condition is detected, you send a signal to that space manager (using a CondVar or an mpsc channel), and that performs the job of reallocation while the workers sleep and check out_of_space periodically.
P.S.S. It is not clear what the read() lock protects from your description (I assumed you need it somehow). If you are fine with running processing and requesting storage at the same time, then a plain mutex also does the job:
// psudo-code
fn thread_worker(tasks) {
for task in tasks {
if out_of_space {
let write_lock = mutex.lock().unwrap();
// get more storage
...
}
// process task
...
}
}