This code runs to completion and prints out the values of my_data
if I uncomment the sleep
line in the do_work
function. If I leave it commented out, my executable hangs every time.
Why does a Condvar not wake the last thread? mentions collecting the handles and waiting for them to join in the main thread, but that should be taken care of by the rayon scope, correct?
How can I have this code complete without the sleep
statement in do_work()
?
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Barrier, Condvar, Mutex,
},
thread,
time::Duration,
};
fn do_work(
mtx: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
barrier: Arc<Barrier>,
quitting: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
barrier.wait();
//thread::sleep(Duration::from_micros(1));
let mut started = mtx.lock().unwrap();
while !*started && !quitting.load(Ordering::SeqCst) {
started = cond_var.wait(started).unwrap();
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i = 1.0);
}
}
println!("{:?} Joining", thread::current().id());
}
fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
let mut started = mtx.lock().unwrap();
*started = true;
cond_var.notify_all();
}
fn reset_work(mtx: Arc<Mutex<bool>>) {
let mut started = mtx.lock().unwrap();
*started = false;
}
fn main() {
let num_threads = 4;
let test_barrier = Arc::new(Barrier::new(num_threads 1));
let test_mutex = Arc::new(Mutex::new(false));
let test_cond_var = Arc::new(Condvar::new());
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_mtx = test_mutex.clone();
let thread_cond_var = test_cond_var.clone();
let thread_barrier = Arc::clone(&test_barrier);
let temp = &quitting;
s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
}
test_barrier.wait();
let _upper_bound = 1024 / num_threads;
for _i in 0..10 {
start_work(test_mutex.clone(), test_cond_var.clone());
test_barrier.wait();
reset_work(test_mutex.clone());
}
quitting.store(true, Ordering::SeqCst);
});
println!("my_data is: {:?}", my_data);
}
Cargo.toml dependencies:
rayon = "*"
This is a test for more complicated math that do_work
will do later on, but I am trying to just get a series of threads that successfully modify a piece of a larger Vec
.
CodePudding user response:
I got the intended behavior working as desired. This seems especially convoluted and like there should be a better way. I would happily accept an answer that is less convoluted than what I have, but it at least produces the desired behavior, a threadpool with prespun threads that produce work in bursts whenever it receives the proper signal from the master thread and can shutdown in a deterministic way. The extra starting and finishing threads provide a handshaking mechanism to ensure there are no race conditions coming up to the barriers.
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Barrier,
},
thread,
};
fn do_work(
start_barrier: &Barrier,
finish_barrier: &Barrier,
quitting: &AtomicBool,
starting: &AtomicBool,
finishing: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
start_barrier.wait();
while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
{
// let mut started = mtx.lock().unwrap();
// while !*started && !quitting.load(Ordering::SeqCst) {
// started = cond_var.wait(started).unwrap();
// }
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i = 1.0);
}
finish_barrier.wait();
while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
}
println!("{:?} Joining", thread::current().id());
}
fn main() {
let num_threads = 4;
let start_barrier = Barrier::new(num_threads 1);
let finish_barrier = Barrier::new(num_threads 1);
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
let starting = AtomicBool::new(false);
let finishing = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_start_barrier = &start_barrier;
let thread_finish_barrier = &finish_barrier;
let thread_quitting = &quitting;
let thread_starting = &starting;
let thread_finishing = &finishing;
s.spawn(move |_| do_work( thread_start_barrier,
thread_finish_barrier,
thread_quitting,
thread_starting,
thread_finishing,
chunk));
}
let num_rounds = 10;
for i in 0..num_rounds {
let start = std::time::Instant::now();
start_barrier.wait();
finishing.store(false, Ordering::SeqCst);
starting.store(true, Ordering::SeqCst);
finish_barrier.wait();
if i == num_rounds-1 {
quitting.store(true, Ordering::SeqCst);
}
finishing.store(true, Ordering::SeqCst);
starting.store(false, Ordering::SeqCst);
println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
}
});
println!("my_data is: {:?}", my_data);
}