Home > OS >  How to finely control the scheduling of rayon?
How to finely control the scheduling of rayon?

Time:04-19

In this problem, I found a way to fill Python's multiprocessing pool (test4). Then I recalled that rayon implements parallelism in iterator style. So I try to implement the same logic in rayon. Here is the code:

use rayon::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let now = Instant::now();
    let table = Arc::new(Mutex::new(vec![b"| ".repeat(20); 8]));

    let write_table = |msg: &[u8]| {
        let mut table = table.lock().unwrap();
        let thread_idx = rayon::current_thread_index().unwrap();
        let time_idx = now.elapsed().as_secs() as usize * 2   1;
        table[thread_idx][time_idx..time_idx   msg.len()].clone_from_slice(msg);
    };

    rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build_global()
        .unwrap();

    (0i32..10)
        .into_par_iter()
        .map(|_| {
            write_table(b"b b b");
            thread::sleep(Duration::from_secs(3));
            (0i32..3).into_par_iter()
        })
        .flatten()
        .map(|_| {
            write_table(b"s");
            thread::sleep(Duration::from_secs(1));
        })
        .collect::<Vec<_>>();

    println!("##### total: {}s #####", now.elapsed().as_secs());
    println!(
        "{}",
        table
            .lock()
            .unwrap()
            .iter()
            .map(|r| std::str::from_utf8(r).unwrap())
            .collect::<Vec<_>>()
            .join("\n")
    );
}

Rust Playground

But it turns out that rayon by default is lazier than I thought. Here is the output of the code:

##### total: 10s #####
|b b b|s|s|s|b b b|s| | | | | | | | | | 
|b b b|s|s|s| | | |s| | | | | | | | | | 
|b b b|s|s|s| | | | | | | | | | | | | | 
|b b b|s|s|s| | | |s| | | | | | | | | | 
|b b b|s|s|s|b b b|s| | | | | | | | | | 
|b b b|s|s|s| | | |s| | | | | | | | | | 
|b b b|s|s|s| | | |s| | | | | | | | | | 
|b b b|s|s|s| | | | | | | | | | | | | | 

You can see that some big tasks are scheduled after some small tasks. As a result, the thread pool is not fully utilized. So how to finely control the scheduling of rayon to fully utilize it?

Update

From comment:

The procedure in Python's process pool is, when the imap is called, it starts executing on the background. If we call next on the iterator, we will block until the result is returned. This requires an additional queue to store the results and the execution should be async (not sure). I was wondering if we can easily achieve this in Rayon.

After tossing around, I found that Rayon actually provides some convenient methods to help me implement this, like for_each_with and par_bridge. I finally got this version: Rust Playground. But it is unstable. Sometimes it gets a better result like below:

##### total: 9s #####
|b b b|b b b|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s|s|s| | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|s|s|s|s|s| | | | | | | | | | | | 
|b b b|b b b|s|s|s| | | | | | | | | | | 
| | | | | | |s|s| | | | | | | | | | | | 
|b b b|b b b|s|s| | | | | | | | | | | |

Sometimes it gets worse. So I guess this may be an antipattern in Rayon?

CodePudding user response:

Rayon's scheduling strategy is known as “work stealing”. The principle of it is that tasks specify points where they can run in parallel; if we look at the provided interface rayon::join() we can see that at least one way to do it is to specify two functions that are candidates for running in parallel. By default, these two closures run in sequence. But, if a thread from Rayon's thread pool doesn't have any work to do, it will look at the second closure from the pair pairs, and “steal” it to run on that free thread.

This strategy has many advantages, mainly in that there is no overhead of communication between threads (and moving the data to be worked on to another core) except when this would enable additional parallelism.

In your particular use case, this happens to produce a poor utilization pattern, because each b task fans out to many s tasks so it's optimal to complete all the b tasks ASAP, but rayon prefers to finish all the s tasks on the same thread as their b task unless there's a free thread, which there isn't.


Unfortunately, I don't think it's possible to get Rayon's parallel iteration to perform scheduling well for your use case. Your case benefits from doing as much of the b task-starting as possible and disregarding the potential sequentially-runnable s activity after it, which is the opposite of what Rayon assumes is desirable.

However, we can step outside of the iterator interface and explicitly spawn parallel tasks, then feed their outputs through a channel to a parallel bridge. This way, the b tasks are not considered candidates to execute sequentially after their parent s task.

let (b_out, s_in) = crossbeam::channel::unbounded();
let mut final_output: Vec<()> = vec![];
rayon::scope(|scope| {
    for _ in 0i32..10 {
        let b_out = b_out.clone();
        scope.spawn(move |_| {
            write_table(b"b b b");
            thread::sleep(Duration::from_secs(3));
            b_out.send((0i32..3).into_par_iter()).unwrap();
        });
    }
    drop(b_out); // needed to ensure the channel is closed when it has no more items

    final_output = s_in
        .into_iter()
        .par_bridge()
        .flatten()
        .map(|_| {
            write_table(b"s");
            thread::sleep(Duration::from_secs(1));
        })
        .collect();
});

This wastes one of the threads, probably because it is spending most of its time blocking on s_in.into_iter().next(). This could be avoided by

  • using spawn() for the s tasks too, instead of a channel, assuming you don't actually need to collect any outputs (or can use a channel for those outputs), or
  • creating a thread pool with 1 extra thread (which will spend most of its time blocking rather than competing for CPU time).

However, it's still faster than the scheduling you started with (9s instead of 10s).

If you don't actually have any work that benefits from further subdivision than you've shown, you might want to consider avoiding Rayon entirely and running your own thread pool. Rayon is a great generic tool but if your workload has a known and simple shape, you should be able to outperform it with custom code.

  • Related