Home > Enterprise >  How to run multiple Tokio async tasks in a loop without using tokio::spawn?
How to run multiple Tokio async tasks in a loop without using tokio::spawn?

Time:04-07

I built a LED clock that also displays weather. My program does a couple of different things in a loop, each thing with a different interval:

  • updates the LEDs every 50ms,
  • checks the light level (to adjust the brightness) every 1 second,
  • fetches weather every 10 minutes,
  • actually some more, but that's irrelevant.

Updating the LEDs is the most critical: I don't want this to be delayed when e.g. weather is being fetched. This should not be a problem as fetching weather is mostly an async HTTP call.

Here's the code that I have:

let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
    tokio::select! {
      _ = measure_light_stream.tick() => {
        let light = lm.get_light();
        light_smooth.sp = light;
      },
      _ = update_weather_stream.tick() => {
        let fetched_weather = weather_service.get(&config).await;
        // Store the fetched weather for later access from the displaying function.
        weather_clock.weather = fetched_weather.clone();
      },
      _ = update_leds_stream.tick() => {
        // Some code here that actually sets the LEDs.
        // This code accesses the weather_clock, the light level etc.
      },
    }
}

I realised the code doesn't do what I wanted it to do - fetching the weather blocks the execution of the loop. I see why - the docs of tokio::select! say the other branches are cancelled as soon as the update_weather_stream.tick() expression completes.

How do I do this in such a way that while fetching the weather is waiting on network, the LEDs are still updated? I figured out I could use tokio::spawn to start a separate non-blocking "thread" for fetching weather, but then I have problems with weather_service not being Send, let alone weather_clock not being shareable between threads. I don't want this complication, I'm fine with everything running in a single thread, just like what select! does.

Reproducible example

use std::time::Duration;
use tokio::time::{interval, sleep};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut slow_stream = interval(Duration::from_secs(3));
    let mut fast_stream = interval(Duration::from_millis(200));
    // Note how access to this data is straightforward, I do not want
    // this to get more complicated, e.g. care about threads and Send.
    let mut val = 1;
    loop {
        tokio::select! {
          _ = fast_stream.tick() => {
            println!(".{}", val);
          },
          _ = slow_stream.tick() => {
            println!("Starting slow operation...");
            // The problem: During this await the dots are not printed.
            sleep(Duration::from_secs(1)).await;
            val  = 1;
            println!("...done");
          },
        }
    }
}

CodePudding user response:

You can use tokio::join! to run multiple async operations concurrently within the same task.

Here's an example:

async fn measure_light(halt: &Cell<bool>) {
    while !halt.get() {
        let light = lm.get_light();
        // ....

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn blink_led(halt: &Cell<bool>) {
    while !halt.get() {
        // LED blinking code

        tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
    }
}

async fn poll_weather(halt: &Cell<bool>) {
    while !halt.get() {
        let weather = weather_service.get(&config).await;
        // ...

        tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
    }
}

// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
    tokio::time::sleep(Duration::from_secs(10)).await;
    halt.set(true);
}

async fn main() {
    let halt = Cell::new(false);
    tokio::join!(
        measure_light(&halt),
        blink_led(&halt),
        poll_weather(&halt),
        terminate(&halt),
    );
}

If you're using tokio::TcpStream or other non-blocking IO, then it should allow for concurrent execution.

I've added a Cell flag for halting execution as an example. You can use the same technique to share any mutable state between join branches.


EDIT: Same thing can be done with tokio::select!. The main difference with your code is that the actual "business logic" is inside the futures awaited by select.

select allows you to drop unfinished futures instead of waiting for them to exit on their own (so halt termination flag is not necessary).

async fn main() {
    tokio::select! {
        _ = measure_light() => {},
        _ = blink_led() = {},
        _ = poll_weather() => {},
    }
}
  • Related