use crossbeam::channel;
use std::thread;
// (unrelated code...)
let (tx3, rx3) = channel::unbounded();
let rx4 = rx3.clone();
let a = vec!["apple", "orange", "banana", "watermelon"];
tx3.send(a).unwrap();
let handle_c = thread::spawn(move || {
for msg in rx3 {
for item in msg {
println!("Child thread c: Received {}", item);
};
};
});
let handle_d = thread::spawn(move || {
for msg in rx4 {
for item in msg {
println!("Child thread d: Received {}", item);
};
};
});
handle_c.join().unwrap();
handle_d.join().unwrap();
The idea is that Child thread c would print each string in vector a, exit, then Child thread d would print each string in vector a and exit.
However, Child thread c doesn't exit after printing everything it received and Child thread d never gets to print anything. How can I make Child thread c exit?
I've tried dropping rx3
but that doesn't work (it tried to access rx3 again). I've also searched for how to kill a child thread, but apparently there isn't a way.
CodePudding user response:
Child thread c doesn't exit after printing everything it received
That is because it could still theoretically receive messages since the sender, tx3
is still alive and in scope. To close the channel, you need to ensure that all senders have been destroyed. You can do that simply by dropping it:
// close the channel
drop(tx3);
// then the for-loops in the spawned threads will end
handle_c.join().unwrap();
handle_d.join().unwrap();
Child thread d never gets to print anything
Funny that when I ran it, it was thread d that printed everything while thread c did nothing. So your behavior is not deterministic which thread will receive which message because the crossbeam channel will not duplicate the message across the receivers.
You'll need something typically called a "broadcast channel" or "fan-out queue" or variations thereof. I spend too much time in the async
world so I only have experience with Tokio's broadcast
module. However, @cafce25 has mentioned the bus crate as a potential solution:
use bus::Bus;
use std::thread;
fn main() {
let mut bus = Bus::new(10);
let rx1 = bus.add_rx();
let rx2 = bus.add_rx();
let a = vec!["apple", "orange", "banana", "watermelon"];
bus.broadcast(a);
let handle_c = thread::spawn(move || {
for msg in rx1 {
for item in msg {
println!("Child thread c: Received {}", item);
}
}
});
let handle_d = thread::spawn(move || {
for msg in rx2 {
for item in msg {
println!("Child thread d: Received {}", item);
}
}
});
drop(bus);
handle_c.join().unwrap();
handle_d.join().unwrap();
}
Child thread c: Received apple
Child thread d: Received apple
Child thread c: Received orange
Child thread c: Received banana
Child thread c: Received watermelon
Child thread d: Received orange
Child thread d: Received banana
Child thread d: Received watermelon