Home > Software engineering >  Multithreaded hashmap insertion using tokio and Mutex
Multithreaded hashmap insertion using tokio and Mutex

Time:08-30

[PLAYGROUND]

I need to execute parallel calls(in this example 2) once and insert result values into the same mutable HashMap defined earlier, then only after all are completed (running once) the program progresses further and extracts the HashMap from Mutex<>.

let mut REZN:Mutex<HashMap<u8, (u128, u128)>> = Mutex::new(HashMap::new());

let b=vec![0, 1, 2, (...), 4999, 5000];

let payload0 = &b[0..2500];
let payload1 = &b[2500..5000];

tokio::spawn(async move{
    let result_ = //make calls
    for (i,j) in izip!(payload0.iter(), result_.iter()){
        REZN.lock().unwrap().insert(*i, (j[0], j[1]));
    };
});

tokio::spawn(async move{
    let result_ = //make calls
    for (i,j) in izip!(payload1.iter(), result_.iter()){
        REZN.lock().unwrap().insert(*i, (j[0], j[1]));
    };
});

I'm just starting with multithreading in Rust. Both the hashmap and the object used to make calls are moved into the spawned thread. I read that cloning should be done and I tried it, but the compiler says:

&mut REZN.lock().unwrap().clone().insert(*i, (j[0], j[1]));
     | |---- use occurs due to use in generator 
  • what does that mean? what's a generator in that context?

and

value moved here, in previous iteration of loop errors are abundant.

  • I don't want it to do more than 1 iteration. How can I put a stop once each is done its job inserting into the HashMap?

Later, I'm trying to escape the lock/extract the Hashmap from inside of Mutex<>:

 let mut REZN:HashMap<u8, (u128, u128)> = *REZN.lock().unwrap();
     |                                                ^^^^^^^^^^^^^^^^^^^^^
     |                                                |
     |                                                move occurs because value has type `HashMap<u8, (u128, u128)>`, which does not implement the `Copy` trait
     |                                                help: consider borrowing here: `&*REZN.lock().unwrap()`

But if I borrow here errors appear elsewhere. Could this work though if there was no conflict? I read that Mutex is removed automatically when threads are done working on it, but I don't know how that happens exactly on a lower level (if you can reccomend resources I'll be glad to read up on that).

I tried clone() both in the threads and the later attempt of extracting the HashMap, and they fail unfortunately. Am I doing it wrong?

Finally, how can I await until both are completed to proceed further in my program?

CodePudding user response:

what does that mean? what's a generator in that context?

An async block compiles to a generator.

I tried clone() both in the threads and the later attempt of extracting the HashMap, and they fail unfortunately. Am I doing it wrong?

Yes. If you clone inside the thread/tasks, then first the map is moved into the routine then it's cloned when used. That's not helpful, because once the map has been moved it can't be used from the caller anymore.

A common solution to that is the "capture clause pattern", where you use an outer block which can then do the setup for a closure or inner block:

tokio::spawn({
    let REZN = REZN.clone();
    async move{
        let result_ = [[6, 406], [7,407]];//make calls
        for (i,j) in izip!(payload0.iter(), result_.iter()){
            REZN.lock().unwrap().insert(*i, (j[0], j[1]));
        };
    });

This way only the cloned map will be moved into the closure.

However this is not very useful, or efficient, or convenient: by cloning the map, each tasks gets its own map (a copy of the original), and you're left with just the unmodified original. This means there's nothing to extract, because in practice it's as if nothing had happened. This also makes the mutex redundant: since each tasks has its own (copy of the) map, there's no need for synchronisation because there's no sharing.

The solution is to use shared ownership primitives, namely Arc:

    let  REZN: Arc<Mutex<HashMap<u8, (u128, u128)>>> = Arc::new(Mutex::new(HashMap::new()));

this way you can share the map between all coroutines, and the mutex will synchronise access: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=33ce606b1ab7c2dfc7f4897de69855ef

Alternatively, Rust threads and tasks can return values, so each task could create a map internally, return it after it's done, and the parent can get those maps and merge them:

let task1 = tokio::spawn(async move {
    let mut map = Map::new();
    let result_ = [[6, 406], [7, 407]]; //make calls
    for (i, j) in izip!(payload0.iter(), result_.iter()) {
        map.insert(*i, (j[0], j[1]));
    }
    map
});

let task2 = tokio::spawn(async move {
    let mut map = Map::new();
    let result_ = [[6, 106], [7, 907]]; //make calls
    for (i, j) in izip!(payload1.iter(), result_.iter()) {
        map.insert(*i, (j[0], j[1]));
    }
    map
});
match tokio::join![task1, task2] {
    (Ok(mut m1), Ok(m2)) => {
        m1.extend(m2.into_iter());
        eprintln!("{:?}", m1);
    }
    e => eprintln!("Error {:?}", e),
}

This has a higher number of allocations, but there is no synchronisation necessary between the workers.

CodePudding user response:

A mutex will give you safe multithread access, but you also need to share the ownership of the mutex itself between those threads.

If you used scoped threads, you could just use a &Mutex<HashMap<...>>, but if you want or need to use normal tokio spawned tasks, you cannot pass a reference because normal tokio tasks require the callback to be 'static, and a reference to a local variable will not comply. In this case the idiomatic solution is to use an Arc<Mutex<HashMap<...>>>.

let REZN:Mutex<HashMap<u8, (u128, u128)>> = Mutex::new(HashMap::new());
let REZN = Arc::new(REZN);

And then pass a clone of the Arc to the spawned tasks. There are several ways to write that but my favourite currently is this:

let task1 = {
    let REZN = Arc::clone(&REZN);
    tokio::spawn(async move{
        //...
    })
};

A little known fact about Arc is that you can extract the inner value using Arc::try_unwrap(), but that will only work if your Arc is the only one pointing to this value. In your case you can ensure that by waiting for (joining) the spawned tasks.

task1.await.unwrap();
task2.await.unwrap();

And then you can unwrap the Arc and the Mutex with this nice looking line:

let REZN = Arc::try_unwrap(REZN).unwrap().into_inner().unwrap();

These four unwraps are for the following:

  • Arc::try_unwrap(REZN) gets to the inner value of the Arc.
  • But only if this is the only clone of the Arc so we have a Result that we have to unwrap().
  • We get a Mutex that we unwrap using into_inner(). Note that we do not lock the mutex to extract the inner value: since into_inner() requires the mutex by value we are sure that it is not borrowed anywhere and we are sure we have exclusive access.
  • But this can fail too if the mutex is poisoned, so another unwrap() to get the real value. This is not needed if you use tokio::Mutex instead, because they don't have poisoning.

You can see the whole thing in this playground.

  • Related