Home > Blockchain >  How can I share a Vector between 2 threads?
How can I share a Vector between 2 threads?

Time:07-23

I am pretty new to Rust, and cannot manage to keep both Arcs values updated in both threads I'm spawning. The idea would be that one thread loops over received events and when it receives one, updates the object, which the other thread constantly watches. How can I achieve that in Rust, or if this method isn't adequate, would there be a better way to do it ? (The concrete idea would be one thread listening for MIDI events and the other one re-rendering on a LED strip the notes received)

Here's what I currently have:

main.rs

mod functions;
mod structs;
use crate::functions::*;
use crate::structs::*;
use portmidi as pm;
use rs_ws281x::{ChannelBuilder, ControllerBuilder, StripType};
use std::sync::{Arc, Mutex};
use std::{fs, thread, time};

const MIDI_TIMEOUT: u64 = 10;
const MIDI_CHANNEL: usize = 0;

#[tokio::main]
async fn main() {
    let config: Arc<std::sync::Mutex<Config>> = Arc::new(Mutex::new(
        toml::from_str(&fs::read_to_string("config.toml").unwrap()).unwrap(),
    ));
    let config_midi = config.clone();
    let config_leds = config.clone();

    let leds_status = Arc::new(Mutex::new(vec![0; config.lock().unwrap().leds.num_leds]));
    let leds_status_midi = Arc::clone(&leds_status);
    let leds_status_leds = Arc::clone(&leds_status);

    thread::spawn(move || {
        let config = config_midi.lock().unwrap();
        let midi_context = pm::PortMidi::new().unwrap();
        let device_info = midi_context
            .device(config.midi.id)
            .expect(format!("Could not find device with id {}", config.midi.id).as_str());
        println!("Using device {}) {}", device_info.id(), device_info.name());
        let input_port = midi_context
            .input_port(device_info, config.midi.buffer_size)
            .expect("Could not create input port");
        let mut leds_status = leds_status_midi.lock().unwrap();

        loop {
            if let Ok(_) = input_port.poll() {
                if let Ok(Some(events)) = input_port.read_n(config.midi.buffer_size) {
                    for event in events {
                        let event_type =
                            get_midi_event_type(event.message.status, event.message.data2);

                        match event_type {
                            MidiEventType::NoteOn => {
                                let key = get_note_position(event.message.data1, &config);
                                leds_status[key] = 1;
                            }
                            MidiEventType::NoteOff => {
                                let key = get_note_position(event.message.data1, &config);
                                leds_status[key] = 0;
                            }
                            _ => {}
                        }
                    }
                }
            }
            thread::sleep(time::Duration::from_millis(MIDI_TIMEOUT));
        }
    });
    thread::spawn(move || {
        let config = config_leds.lock().unwrap();
        let mut led_controller = ControllerBuilder::new()
            .freq(800_000)
            .dma(10)
            .channel(
                MIDI_CHANNEL,
                ChannelBuilder::new()
                    .pin(config.leds.pin)
                    .count(config.leds.num_leds as i32)
                    .strip_type(StripType::Ws2812)
                    .brightness(config.leds.brightness)
                    .build(),
            )
            .build()
            .unwrap();

        loop {
            let leds_status = leds_status_leds.lock().unwrap();

            print!("\x1b[2J\x1b[1;1H");
            println!(
                "{:?}",
                leds_status.iter().filter(|x| (**x) > 0).collect::<Vec<_>>()
            );
        }
    });
}

functions.rs

use crate::structs::MidiEventType;

pub fn get_note_position(note: u8, config: &crate::structs::Config) -> usize {
    let mut note_offset = 0;
    for i in 0..config.leds.offsets.len() {
        if note > config.leds.offsets[i][0] {
            note_offset = config.leds.offsets[i][1];
            break;
        }
    }
    note_offset -= config.leds.shift;
    let note_pos_raw = 2 * (note - 20) - note_offset;
    config.leds.num_leds - (note_pos_raw as usize)
}
pub fn get_midi_event_type(status: u8, velocity: u8) -> MidiEventType {
    if status == 144 && velocity > 0 {
        MidiEventType::NoteOn
    } else if status == 128 || (status == 144 && velocity == 0) {
        MidiEventType::NoteOff
    } else {
        MidiEventType::ControlChange
    }
}

structs.rs

use serde_derive::Deserialize;

#[derive(Deserialize, Debug)]
pub struct Config {
    pub leds: LedsConfig,
    pub midi: MidiConfig,
}

#[derive(Deserialize, Debug)]
pub struct LedsConfig {
    pub pin: i32,
    pub num_leds: usize,
    pub brightness: u8,
    pub offsets: Vec<Vec<u8>>,
    pub shift: u8,
    pub fade: i8,
}

#[derive(Deserialize, Debug)]
pub struct MidiConfig {
    pub id: i32,
    pub buffer_size: usize,
}
#[derive(Debug)]
pub enum MidiEventType {
    NoteOn,
    NoteOff,
    ControlChange,
}

Thank you very much !

CodePudding user response:

The idea would be that one thread loops over received events and when it receives one, updates the object, which the other thread constantly watches.

That's a good way to do it, particularly if one of the threads needs to be near-realtime (e.g. live audio processing). You can use channels to achieve this. You transfer the sender to one thread and the receiver to another. In a realtime scenario, the receiver can loop until try_recv errs with Empty (limiting to some number of iterations to prevent starvation of the processing code). For example, something like this, given a r: Receiver:

// Process 100 messages max to not starve the thread of the other stuff
// it needs to be doing.
for _ in 0..100 {
    match r.try_recv() {
        Ok(msg) => { /* Process msg, applying it to the current state */ },
        Err(TryRecvError::Empty) => break,
        Err(TryRecvError::Disconnected) => {
            // The sender is gone, maybe this is our signal to terminate?
            return;
        },
    }
}

Alternatively, if one thread needs to act only when a message is received, it can simply iterate the receiver, which will continue to loop as long as messages are received and the channel is open:

for msg in r {
    // Handle the message
}

It really is that simple. If the channel is empty but there are senders alive, it will block until a message is received. Once all senders are gone, the loop will terminate.

A channel can convey messages of exactly one type; if only one kind of message needs to be sent, you can use a struct. Otherwise, an enum with variants for each kind of message works well.

Given the sending side of the channel, s: Sender, you just s.send(your_message_value).

Another option would be to create an Arc<Mutex<_>>, which it looks like you are doing in your sample code. This way is fine if the lock contention is not too high, but this can inhibit the ability of both threads to run concurrently, which is often the goal of multithreading. Channels tend to work better in message-passing scenarios because there isn't a need for a mutual exclusion lock.


As a side note, you are using Tokio with an async main(), but you never actually do anything with any futures, so there's no reason to even use Tokio in this code.

  • Related