Home > Enterprise >  Tokio subtask within task not executing as expected
Tokio subtask within task not executing as expected

Time:07-07

Although I am new to rust, after having read the rust and tokio books I thought to know how async tasks work. However, obviously I missed something crucial, since I can't seem to be able to resolve the following problem:

I have an async function (asfunc) that needs to poll some sensors indefinitely. For the purposes of demonstration and testing, I created the following stylized function emulating a steady source:

async fn asfunc(tx: Sender<String>, name: String) {
    loop {
        thread::sleep( time::Duration::from_secs(1));
        let now = time::Instant::now(); 
        tx.send(format!("[{}] {:?}",name, now)).unwrap();
    }
}

I will have to run several of these concurrently form within a main program, and some of these async function will be wrapped in another async function (wrapper()) for post-processing of the data received. Below is the core functionality (it's in a file called th_test.rs):

use std::sync::mpsc::Sender; 
use std::{thread, time};

async fn wrapper() {
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        asfunc(tx,"two".to_string()).await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[wrapper] {:?}",msg);
    }
}

async fn asfunc(tx: Sender<String>, name: String) {
    loop {
        thread::sleep( time::Duration::from_secs(1));
        let now = time::Instant::now(); 
        tx.send(format!("[{}] {:?}",name, now)).unwrap();
    }
}

pub async fn run_test() {    
    let (tx, rx) = std::sync::mpsc::channel::<String>();
    
    tokio::task::spawn(async move {
        asfunc(tx,"one".to_string()).await;
    });

    tokio::task::spawn(async move {
        wrapper().await;
    });

    while let Ok(msg) = rx.recv() {
        println!("[asfunc] {:?}",msg);
    }
}

And for completeness sake, here is the main.rs file:

mod th_test;

#[tokio::main]
async fn main() {    
    th_test::run_test().await;
}

When I run this, I would expect some output from the wrapper function as well, since the code is pretty much identical to what I have in the run_test() function, but what I get is only the output from "directly" calling asfunc:

[asfunc] [one] Instant { t: 797790.6589759s }
[asfunc] [one] Instant { t: 797791.6660039s }
[asfunc] [one] Instant { t: 797792.678952s }
...

What am doing wrong here? Any help would be greatly appreciated!

CodePudding user response:

Never use thread::sleep() or std::sync::mpsc channels (or any other blocking operation) inside async fn. Use the corresponding async primitives: tokio::time::sleep() and tokio::sync::mpsc. With them, your code works.

The exact reason it fails is not clear to me: probably something somewhen blocks all executor threads and it cannot run the task wrapper() spawned (I can confirm it doesn't run with a simple println!()). But it doesn't matter: you should never block the executor. See Async: What is blocking?.

CodePudding user response:

I fully agree with everything @ChayimFriedman says.

Here is some code to accompany his answer, based on the fact that you say that you need to use std::sync::mpsc:

use std::sync::mpsc::Sender;
use std::time;

async fn wrapper() {
    let (tx, rx) = std::sync::mpsc::channel::<String>();

    tokio::task::spawn(async move {
        asfunc(tx, "two".to_string()).await;
    });

    tokio::task::spawn_blocking(move || {
        while let Ok(msg) = rx.recv() {
            println!("[wrapper] {:?}", msg);
        }
    })
    .await
    .unwrap();
}

async fn asfunc(tx: Sender<String>, name: String) {
    loop {
        tokio::time::sleep(time::Duration::from_secs(1)).await;
        let now = time::Instant::now();
        tx.send(format!("[{}] {:?}", name, now)).unwrap();
    }
}

pub async fn run_test() {
    let (tx, rx) = std::sync::mpsc::channel::<String>();

    tokio::task::spawn(async move {
        asfunc(tx, "one".to_string()).await;
    });

    tokio::task::spawn(async move {
        wrapper().await;
    });

    tokio::task::spawn_blocking(move || {
        while let Ok(msg) = rx.recv() {
            println!("[asfunc] {:?}", msg);
        }
    })
    .await
    .unwrap();
}

#[tokio::main]
async fn main() {
    run_test().await;
}
[wrapper] "[two] Instant { tv_sec: 100182, tv_nsec: 4500700 }"
[asfunc] "[one] Instant { tv_sec: 100182, tv_nsec: 4714700 }"
[wrapper] "[two] Instant { tv_sec: 100183, tv_nsec: 5557800 }"
[asfunc] "[one] Instant { tv_sec: 100183, tv_nsec: 5976800 }"
[wrapper] "[two] Instant { tv_sec: 100184, tv_nsec: 7570800 }"
[asfunc] "[one] Instant { tv_sec: 100184, tv_nsec: 7329700 }"
[wrapper] "[two] Instant { tv_sec: 100185, tv_nsec: 8777300 }"
[asfunc] "[one] Instant { tv_sec: 100185, tv_nsec: 9211100 }"
[asfunc] "[one] Instant { tv_sec: 100186, tv_nsec: 11582400 }"
[wrapper] "[two] Instant { tv_sec: 100186, tv_nsec: 11741700 }"
[wrapper] "[two] Instant { tv_sec: 100187, tv_nsec: 13959800 }"
...
  • Related