Home > OS >  How to create threads in a for loop and get the return value from each?
How to create threads in a for loop and get the return value from each?

Time:04-02

I am writing a program that pings a set of targets 100 times, and stores each RTT value returned from the ping into a vector, thus giving me a set of RTT values for each target. Say I have n targets, I would like all of the pinging to be done concurrently. The rust code looks like this:

    let mut sample_rtts_map = HashMap::new();

    for addr in targets.to_vec() {
        let mut sampleRTTvalues: Vec<f32> = vec![];
        //sample_rtts_map.insert(addr, sampleRTTvalues);

        thread::spawn(move || {
            while sampleRTTvalues.len() < 100 {
                let sampleRTT = ping(addr);
                sampleRTTvalues.push(sampleRTT);
                // thread::sleep(Duration::from_millis(5000));
            }
        });
    }

The hashmap is used to tell which vector of values belongs to which target. The problem is, how do I retrieve the updated sampleRTTvalues from each thread after the thread is done executing? I would like something like:

let (name, sampleRTTvalues) = thread::spawn(...)

The name, being the name of the thread, and sampleRTTvalues being the vector. However, since I'm creating threads in a for loop, each thread is being instantiated the same way, so how I differentiate them?

Is there some better way to do this? I've looked into schedulers, future, etc., but it seems my case can just be done with simple threads.

CodePudding user response:

I go the desired behavior with the following code:

use std::thread;
use std::sync::mpsc;
use std::collections::HashMap;
use rand::Rng;
use std::net::{Ipv4Addr,Ipv6Addr,IpAddr};
const RTT_ONE: IpAddr = IpAddr::V4(Ipv4Addr::new(127,0,0,1));
const RTT_TWO: IpAddr = IpAddr::V6(Ipv6Addr::new(0,0,0,0,0,0,0,1));
const RTT_THREE: IpAddr = IpAddr::V4(Ipv4Addr::new(127,0,1,1));//idk how ip adresses work, forgive if this in invalid but you get the idea
fn ping(address: IpAddr) -> f32 {
    rand::thread_rng().gen_range(5.0..107.0)
}
fn main() {
    let targets = [RTT_ONE,RTT_TWO,RTT_THREE];
    let mut sample_rtts_map: HashMap<IpAddr,Vec<f32>> = HashMap::new();
    for addr in targets.into_iter() {
        let (sample_values,moved_values) = mpsc::channel();
        let mut sampleRTTvalues: Vec<f32> = vec![];
        thread::spawn(move || {
            while sampleRTTvalues.len() < 100 {
                let sampleRTT = ping(addr);
                sampleRTTvalues.push(sampleRTT);
                //thread::sleep(Duration::from_millis(5000));
            }
        });
        sample_rtts_map.insert(addr,moved_values.recv().unwrap());
    }
}

note that the use rand::Rng can be removed when implementing, as it is only so the example works. what this does is pass data from the spawned thread to the main thread, and in the method used it waits until the data is ready before adding it to the hash map. If this is problematic (takes a long time, etc.) then you can use try_recv instead of recv which will add an error / option type that will return a recoverable error if the value is ready when unwrapped, or return the value if it's ready

CodePudding user response:

You can use a std::sync::mpsc channel to collect your data:

use std::collections::HashMap;
use std::sync::mpsc::channel;
use std::thread;

fn ping(_: &str) -> f32 { 0.0 }

fn main() {
    let targets = ["a", "b"]; // just for example
    
    let mut sample_rtts_map = HashMap::new();
    let (tx, rx) = channel();

    for addr in targets {
        let tx = tx.clone();
        thread::spawn(move || {
            for _ in 0..100 {
                let sampleRTT = ping(addr);
                tx.send((addr, sampleRTT));
            }
        });
    }
    
    drop(tx);
    // exit loop when all thread's tx have dropped
    while let Ok((addr, sampleRTT)) = rx.recv() {
        sample_rtts_map.entry(addr).or_insert(vec![]).push(sampleRTT);
    }
    
    println!("sample_rtts_map: {:?}", sample_rtts_map);
}

This will run all pinging threads simultaneously, and collect data in main thread synchronously, so that we can avoid using locks. Do not forget to drop sender in main thread after cloning to all pinging threads, or the main thread will hang forever.

  • Related