Home > Net >  Lockless processing of non overlapping non contiguous indexes by multiple threads in Rust
Lockless processing of non overlapping non contiguous indexes by multiple threads in Rust

Time:10-02

I am practicing rust and decided to create a Matrix ops/factorization project.

Basically I want to be able to process the underlying vector in multiple threads. Since I will be providing each thread non-overlapping indexes (which may or may not be contiguous) and the threads will be joined before the end of whatever function created them, there is no need for a lock /synchronization.

I know that there are several crates that can do this, but I would like to know if there is a relatively idiomatic crate-free way to implement it on my own.

The best I could come up with is (simplified the code a bit):

use std::thread;

//This represents the Matrix
#[derive(Debug, Clone)]
pub struct MainStruct {
    pub data: Vec<f64>,
}
//This is the bit that will be shared by the threads, 
//ideally it should have its lifetime tied to that of MainStruct
//but i have no idea how to make phantomdata work in this case
#[derive(Debug, Clone)]
pub struct SliceTest {
    pub data: Vec<SubSlice>,
}
//This struct is to hide *mut f64 to allow it to be shared to other threads
#[derive(Debug, Clone)]
pub struct SubSlice {
    pub data: *mut f64,
}

impl MainStruct {
    pub fn slice(&mut self) -> (SliceTest, SliceTest) {
        let mut out_vec_odd: Vec<SubSlice> = Vec::new();

        let mut out_vec_even: Vec<SubSlice> = Vec::new();

        unsafe {
            let ptr = self.data.as_mut_ptr();

            for i in 0..self.data.len() {
                let ptr_to_push = ptr.add(i);
                //Non contiguous idxs
                if i % 2 == 0 {
                    out_vec_even.push(SubSlice{data:ptr_to_push});
                } else {
                    out_vec_odd.push(SubSlice{data:ptr_to_push});
                }
            }
        }

        (SliceTest{data: out_vec_even}, SliceTest{data: out_vec_odd})
    }
}

impl SubSlice {
    pub fn set(&self, val: f64) {
        unsafe {*(self.data) = val;}
    }
}
unsafe impl Send for SliceTest {}
unsafe impl Send for SubSlice {}

fn main() {
    let mut maindata = MainStruct {
        data: vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
    };

    let (mut outvec1, mut outvec2) = maindata.slice();
    let mut threads = Vec::new();

    threads.push(
        thread::spawn(move || {
            for i in 0..outvec1.data.len() {
                outvec1.data[i].set(999.9);
            }
        })
    );
    threads.push(
        thread::spawn(move || {
            for i in 0..outvec2.data.len() {
                outvec2.data[i].set(999.9);
            }
        })
    );

    for handles in threads {
        handles.join();
    }

    println!("maindata = {:?}", maindata.data);
}

EDIT: Following kmdreko suggestion below, got the code to work exactly how I wanted it without using unsafe code, yay!

Of course in terms of performance it may be cheaper to copy the f64 slices than to create mutable reference vectors unless your struct is filled with other structs instead of f64

extern crate crossbeam;
use crossbeam::thread;

#[derive(Debug, Clone)]
pub struct Matrix {
    data: Vec<f64>,
    m: usize, //number of rows
    n: usize, //number of cols
}

...

impl Matrix {
    ...
    pub fn get_data_mut(&mut self) -> &mut Vec<f64> {
        &mut self.data
    }

    pub fn calculate_idx(max_cols: usize, i: usize, j: usize) -> usize {
        let actual_idx = j   max_cols * i;
        actual_idx
    }
    //Get individual mutable references for contiguous indexes (rows)
    pub fn get_all_row_slices(&mut self) -> Vec<Vec<&mut f64>> {
        let max_cols = self.max_cols();
        let max_rows = self.max_rows();
        let inner_data = self.get_data_mut().chunks_mut(max_cols);
        let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_rows);

        for chunk in inner_data {
            let row_vec = chunk.iter_mut().collect();
            out_vec.push(row_vec);
        }

        out_vec
    }
    //Get mutable references for disjoint indexes (columns)
    pub fn get_all_col_slices(&mut self) -> Vec<Vec<&mut f64>> {
        let max_cols = self.max_cols();
        let max_rows = self.max_rows();
        let inner_data = self.get_data_mut().chunks_mut(max_cols);
        let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_cols);

        for _ in 0..max_cols {
            out_vec.push(Vec::with_capacity(max_rows));
        }

        let mut inner_idx = 0;

        for chunk in inner_data {
            let row_vec_it = chunk.iter_mut();

            for elem in row_vec_it {
                out_vec[inner_idx].push(elem);
                inner_idx  = 1;
            }

            inner_idx = 0;
        }

        out_vec
    }
    ...
}

fn test_multithreading() {
    fn test(in_vec: Vec<&mut f64>) {
        for elem in in_vec {
            *elem = 33.3;
        }
    }

    fn launch_task(mat: &mut Matrix, f: fn(Vec<&mut f64>)) {

        let test_vec = mat.get_all_row_slices();
        thread::scope(|s| {
            for elem in test_vec.into_iter() {
                s.spawn(move |_| {
                        println!("Spawning thread...");
                        f(elem);
                    });
            }
        }).unwrap();
    }

    let rows = 4;
    let cols = 3;
    //new function code omitted, returns Result<Self, MatrixError>
    let mut mat = Matrix::new(rows, cols).unwrap()

    launch_task(&mut mat, test);

    for i in 0..rows {
        for j in 0..cols {
            //Requires index trait implemented for matrix
            assert_eq!(mat[(i, j)], 33.3);
        }
    }
}

CodePudding user response:

This API is unsound. Since there is no lifetime annotation binding SliceTest and SubSlice to the MainStruct, they can be preserved after the data has been destroyed and if used would result in use-after-free errors.

Its easy to make it safe though; you can use .iter_mut() to get distinct mutable references to your elements:

pub fn slice(&mut self) -> (Vec<&mut f64>, Vec<&mut f64>) {
    let mut out_vec_even = vec![];
    let mut out_vec_odd = vec![];
    
    for (i, item_ref) in self.data.iter_mut().enumerate() {
        if i % 2 == 0 {
            out_vec_even.push(item_ref);
        } else {
            out_vec_odd.push(item_ref);
        }
    }

    (out_vec_even, out_vec_odd)
}

However, this surfaces another problem: thread::spawn cannot hold references to local variables. The threads created are allowed to live beyond the scope they're created in, so even though you did .join() them, you aren't required to. This was a potential issue in your original code as well, just the compiler couldn't warn about it.

There's no easy way to solve this. You'd need to use a non-referential way to use data on the other threads, but that would be using Arc, which doesn't allow mutating its data, so you'd have to resort to a Mutex, which is what you've tried to avoid.

I would suggest reaching for scope from the crossbeam crate, which does allow you to spawn threads that reference local data. I know you've wanted to avoid using crates, but this is the best solution in my opinion.

See a working version on the playground.

See:

  • Related