Home > Enterprise >  How to parallelize this loop?
How to parallelize this loop?

Time:05-21

I'm running into a wall with the borrow checker, I think because it can't tell things are no longer used by the worker after it is joined by the main thread. I've reduced my problem to the simplest possible example of the same pattern that I could. I'm trying to avoid unnecessary allocations and copies, especially of b.

fn sequential_fn() -> Vec<i32>
{
    let mut a = vec![1,2,3,4]; //two mutable arrays
    let mut b = vec![0;4];

    for repetitions in 0..100
    {
        for i in 0..4 
        {
            b[i] ^= a[i]; //example heavy operation - a isnt modified but b is
        }
        b.sort();       //heavy operation over, b is sorted
        for e in a.iter_mut() //a is modified by main thread before repeating process 
            { *e  = 1;}
    }
    return b; //return b 
}

The sequential version above compiles and works fine. This is how I've attempted to parallelize the heavy operation to split up the workload:

fn parallel_fn() -> Vec<i32>
{
    let mut a = vec![1,2,3,4];
    let mut b = vec![0;4];
    let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)

    for repetitions in 0..100
    {
        //only 1 worker for example purposes
        let worker = std::thread::spawn(||  //(err2)
        {
            for i in 2..4 
            {
                b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
            }
        });
        for i in 0..2
        {
            b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a 
        }
        worker.join(); //workers finish
        b.sort(); //borrow b as mutable in main thread only (err3)
        for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
            { *e  = 1;}
    }
    return b;
}

The errors I'm getting are:

error[E0597]: `b` does not live long enough
  --> src/lib.rs:5:29
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             ^^^^^^^^^^^^^^^^^
   |                             |
   |                             borrowed value does not live long enough
   |                             argument requires that `b` is borrowed for `'static`
...
27 | }
   | - `b` dropped here while still borrowed

error[E0499]: cannot borrow `*b2` as mutable more than once at a time
  --> src/lib.rs:10:41
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |                        -                  ^^ `*b2` was mutably borrowed here in the previous iteration of the loop
   |  ______________________|
   | |
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   | |                 -- borrows occur due to use of `*b2` in closure
15 | |             }
16 | |         });
   | |__________- argument requires that `*b2` is borrowed for `'static`

error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function
  --> src/lib.rs:10:41
   |
10 |         let worker = std::thread::spawn(||  //(err2)
   |                                         ^^ may outlive borrowed value `a`
...
14 |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   |                            - `a` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/lib.rs:10:22
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |  ______________________^
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
15 | |             }
16 | |         });
   | |__________^
help: to force the closure to take ownership of `a` (and any other referenced variables), use the `move` keyword
   |
10 |         let worker = std::thread::spawn(move ||  //(err2)
   |                                             

error[E0499]: cannot borrow `b` as mutable more than once at a time
  --> src/lib.rs:22:9
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             ----------------- first mutable borrow occurs here
...
19 |             b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a 
   |             ----- first borrow later used here
...
22 |         b.sort(); //borrow b as mutable in main thread only (err3)
   |         ^^^^^^^^ second mutable borrow occurs here

error[E0502]: cannot borrow `a` as mutable because it is also borrowed as immutable
  --> src/lib.rs:23:18
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |                        -                  -- immutable borrow occurs here
   |  ______________________|
   | |
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   | |                            - first borrow occurs due to use of `a` in closure
15 | |             }
16 | |         });
   | |__________- argument requires that `a` is borrowed for `'static`
...
23 |           for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
   |                    ^^^^^^^^^^^^ mutable borrow occurs here

error[E0505]: cannot move out of `b` because it is borrowed
  --> src/lib.rs:26:12
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             -----------------
   |                             |
   |                             borrow of `b` occurs here
   |                             argument requires that `b` is borrowed for `'static`
...
26 |     return b;
   |            ^ move out of `b` occurs here

Playground

CodePudding user response:

You've made a bunch of assumptions in your code which rust borrow checker can't ensure in compile type:

  • When parallel_fn finishes, a and b are removed. You are calling .join() which ensures the spawned thread should finish by that time, but it is a runtime check, rust can't check it in compile time. That is why you can't pass reference to spawn, so you need arc
  • You are calling .join() which ensures no read operation over a happens when you modify it, but compiler can't check this.
  • You mutate parts of the same vector in different threads, which is not safe unless you take extra care, so you need a little bit of unsafe code. Alternative would be split the vector into 2 vectors (not slices) and join them, but it is extra copying.

I've ended up with something like this:

use std::sync::{Mutex, Arc, RwLock};

#[derive(Debug)]
struct VectorHolder {
    vec: Vec<i32>,
    slice1_ptr: Mutex<*mut [i32]>,
    slice2_ptr: Mutex<*mut [i32]>,
}

impl VectorHolder {

    pub fn new(mut vec: Vec<i32>) -> Self {
        let (slice1 , slice2) = vec.split_at_mut(2);
        let slice1_ptr = slice1 as *mut _;
        let slice2_ptr = slice2 as *mut _;
        Self {
            vec,
            slice1_ptr: Mutex::new(slice1_ptr),
            slice2_ptr: Mutex::new(slice2_ptr),
        }
    }

    /// Below operations are "safe" as no one else can access parts of Vec until
    /// VectorHolder is consumed.
    /// It is also safe we can't call op1 from 2 threads due to the mutex.
    /// Mutex could optionally be removed if you are sure you never call op1 and op2 concurrently.
    /// In this case we are sure about that as we have .join
    pub fn op1(&self, a: &[i32]) {
        let mut guard = self.slice2_ptr.lock().unwrap();
        let b2 = unsafe { &mut **guard };
        for i in 2..4 {
            b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
        }
    }

    pub fn op2(&self, a: &[i32]) {
        let mut guard = self.slice1_ptr.lock().unwrap();
        let b1 = unsafe { &mut **guard };
        for i in 0..2 {
            b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
        }
    }

    pub fn consume(self) -> Vec<i32> {
        self.vec
    }
}

unsafe impl Send for VectorHolder { }
unsafe impl Sync for VectorHolder { }

pub fn operations_on_b(b: Vec<i32>, a: Arc<RwLock<Vec<i32>>>) -> Vec<i32> {
    let holder = Arc::new(VectorHolder::new(b));

    //only 1 worker for example purposes
    let holder_clone = holder.clone();
    let a_clone = a.clone();
    let worker = std::thread::spawn(move || {
        holder_clone.op1(a_clone.read().unwrap().as_slice());
    });
    holder.op2(a.read().unwrap().as_slice());
    worker.join().unwrap(); //workers finish

    let mut modified_b = Arc::try_unwrap(holder).unwrap().consume();

    modified_b.sort();
    modified_b
}

fn parallel_fn() -> Vec<i32>
{
    let a = Arc::new(RwLock::new(vec![1,2,3,4]));
    let mut b = vec![0;4];

    for _repetitions in 0..100
    {
        b = operations_on_b(b, a.clone());

        for e in a.write().unwrap().iter_mut() //borrow a as mutable in main thread only (err4)
        { *e  = 1;}
    }
    return b;
}


fn main() {
    println!("{:?}", parallel_fn());
}
  • Related