Although I am new to rust, after having read the rust and tokio books I thought to know how async tasks work. However, obviously I missed something crucial, since I can't seem to be able to resolve the following problem:
I have an async function (asfunc) that needs to poll some sensors indefinitely. For the purposes of demonstration and testing, I created the following stylized function emulating a steady source:
async fn asfunc(tx: Sender<String>, name: String) {
loop {
thread::sleep( time::Duration::from_secs(1));
let now = time::Instant::now();
tx.send(format!("[{}] {:?}",name, now)).unwrap();
}
}
I will have to run several of these concurrently form within a main program, and some of these async function will be wrapped in another async function (wrapper()) for post-processing of the data received. Below is the core functionality (it's in a file called th_test.rs):
use std::sync::mpsc::Sender;
use std::{thread, time};
async fn wrapper() {
let (tx, rx) = std::sync::mpsc::channel::<String>();
tokio::task::spawn(async move {
asfunc(tx,"two".to_string()).await;
});
while let Ok(msg) = rx.recv() {
println!("[wrapper] {:?}",msg);
}
}
async fn asfunc(tx: Sender<String>, name: String) {
loop {
thread::sleep( time::Duration::from_secs(1));
let now = time::Instant::now();
tx.send(format!("[{}] {:?}",name, now)).unwrap();
}
}
pub async fn run_test() {
let (tx, rx) = std::sync::mpsc::channel::<String>();
tokio::task::spawn(async move {
asfunc(tx,"one".to_string()).await;
});
tokio::task::spawn(async move {
wrapper().await;
});
while let Ok(msg) = rx.recv() {
println!("[asfunc] {:?}",msg);
}
}
And for completeness sake, here is the main.rs file:
mod th_test;
#[tokio::main]
async fn main() {
th_test::run_test().await;
}
When I run this, I would expect some output from the wrapper function as well, since the code is pretty much identical to what I have in the run_test() function, but what I get is only the output from "directly" calling asfunc:
[asfunc] [one] Instant { t: 797790.6589759s }
[asfunc] [one] Instant { t: 797791.6660039s }
[asfunc] [one] Instant { t: 797792.678952s }
...
What am doing wrong here? Any help would be greatly appreciated!
CodePudding user response:
Never use thread::sleep()
or std::sync::mpsc
channels (or any other blocking operation) inside async fn
. Use the corresponding async
primitives: tokio::time::sleep()
and tokio::sync::mpsc
. With them, your code works.
The exact reason it fails is not clear to me: probably something somewhen blocks all executor threads and it cannot run the task wrapper()
spawned (I can confirm it doesn't run with a simple println!()
). But it doesn't matter: you should never block the executor. See Async: What is blocking?.
CodePudding user response:
I fully agree with everything @ChayimFriedman says.
Here is some code to accompany his answer, based on the fact that you say that you need to use std::sync::mpsc
:
use std::sync::mpsc::Sender;
use std::time;
async fn wrapper() {
let (tx, rx) = std::sync::mpsc::channel::<String>();
tokio::task::spawn(async move {
asfunc(tx, "two".to_string()).await;
});
tokio::task::spawn_blocking(move || {
while let Ok(msg) = rx.recv() {
println!("[wrapper] {:?}", msg);
}
})
.await
.unwrap();
}
async fn asfunc(tx: Sender<String>, name: String) {
loop {
tokio::time::sleep(time::Duration::from_secs(1)).await;
let now = time::Instant::now();
tx.send(format!("[{}] {:?}", name, now)).unwrap();
}
}
pub async fn run_test() {
let (tx, rx) = std::sync::mpsc::channel::<String>();
tokio::task::spawn(async move {
asfunc(tx, "one".to_string()).await;
});
tokio::task::spawn(async move {
wrapper().await;
});
tokio::task::spawn_blocking(move || {
while let Ok(msg) = rx.recv() {
println!("[asfunc] {:?}", msg);
}
})
.await
.unwrap();
}
#[tokio::main]
async fn main() {
run_test().await;
}
[wrapper] "[two] Instant { tv_sec: 100182, tv_nsec: 4500700 }"
[asfunc] "[one] Instant { tv_sec: 100182, tv_nsec: 4714700 }"
[wrapper] "[two] Instant { tv_sec: 100183, tv_nsec: 5557800 }"
[asfunc] "[one] Instant { tv_sec: 100183, tv_nsec: 5976800 }"
[wrapper] "[two] Instant { tv_sec: 100184, tv_nsec: 7570800 }"
[asfunc] "[one] Instant { tv_sec: 100184, tv_nsec: 7329700 }"
[wrapper] "[two] Instant { tv_sec: 100185, tv_nsec: 8777300 }"
[asfunc] "[one] Instant { tv_sec: 100185, tv_nsec: 9211100 }"
[asfunc] "[one] Instant { tv_sec: 100186, tv_nsec: 11582400 }"
[wrapper] "[two] Instant { tv_sec: 100186, tv_nsec: 11741700 }"
[wrapper] "[two] Instant { tv_sec: 100187, tv_nsec: 13959800 }"
...