Home > OS >  Using VecDeque in Multithreading
Using VecDeque in Multithreading

Time:07-07

I am trying to create a multithreading app using VecDeque. I wanted to use it as a shared Queue with readwrite permissions for all threads. I have the following "example / test" code:

use std::collections::VecDeque;
use std::{thread, time};

fn main() {
    let mut workload = VecDeque::new();
    workload.push_back(0);

    let mut thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1  =1;
            thread_1_queue.push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue);

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let mut thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2  =1;
            thread_2_queue.push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue);

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload);

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=c4e58a9e99fac76b1db9e0ef24eeca6e (Beware that it will run endless)

My Problem is now that the Clones in the threads won't update the Mainqueue. Now each thread has his own Queue instead of have one shared. As shown here in the result:

Thread #1: [0, 1]
MainQueue: [0]
Thread #2: [0, 11]
Thread #1: [0, 1, 2]
Thread #2: [0, 11, 12]
MainQueue: [0]
MainQueue: [0]
Thread #2: [0, 11, 12, 13]
Thread #1: [0, 1, 2, 3]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14]
Thread #1: [0, 1, 2, 3, 4]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15]
Thread #1: [0, 1, 2, 3, 4, 5]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16]
Thread #1: [0, 1, 2, 3, 4, 5, 6]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8]
MainQueue: [0]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
MainQueue: [0]

CodePudding user response:

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let workload = Arc::new(Mutex::new(VecDeque::new()));
    workload.lock().unwrap().push_back(0);

    let thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1  = 1;
            thread_1_queue.lock().unwrap().push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2  = 1;
            thread_2_queue.lock().unwrap().push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.lock().unwrap().capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload.lock().unwrap());

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...

Explanation

Arc creates a multi-threaded reference counter with which you can share a single object to multiple threads. Note that the content of Arc is always immutable because multiple mutable references to the same object are never allowed in Rust.

That's why you need a Mutex internally. It creates what is called interior mutability. That means, you can use it to temporarily get mutable access to the object, while it makes sure that the mutable access doesn't collide with other threads.

Further, this means that when a different thread calls lock() while it is already locked, it will block the other thread. This is what is called a bottleneck and will limit the amount of speedup you will get from your multithreading.

Further, be aware that between two lock()s, the content of the queue could change. So if it's important that something happens atomically to the queue, you need to keep the queue locked for the entire duration of that action, which further reduces your speedup.

Further bugs

  • I think you mix up .capacity() and .len().
  • You should do something with the Result of .join(), which I here will simply .unwrap().
  • .len() == 10 won't work in a multi-threaded scenario, because it could jump directly from 9 to 11. So for multi-threaded scenarios, it's better to do >= 10, which will always work.

Fixed code that doesn't run forever:

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let workload = Arc::new(Mutex::new(VecDeque::new()));
    workload.lock().unwrap().push_back(0);

    let thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1  = 1;
            thread_1_queue.lock().unwrap().push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2  = 1;
            thread_2_queue.lock().unwrap().push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.lock().unwrap().len() >= 10 {
            break;
        }

        println!("MainQueue: {:?}", workload.lock().unwrap());

        thread::sleep(some_time);
    }

    thread_1.join().unwrap();
    thread_2.join().unwrap();
}
Thread #1: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #1: [0, 1, 11, 2]
Thread #2: [0, 1, 11, 2, 12]
MainQueue: [0, 1, 11, 2, 12]
Thread #1: [0, 1, 11, 2, 12, 3]
MainQueue: [0, 1, 11, 2, 12, 3]
Thread #2: [0, 1, 11, 2, 12, 3, 13]
MainQueue: [0, 1, 11, 2, 12, 3, 13]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]
  • Related