Home > front end >  Tokio thread management: restart subsidiary thread from inside encapsulating thread?
Tokio thread management: restart subsidiary thread from inside encapsulating thread?

Time:09-05

use tokio;
use tokio::time::{sleep, Duration};
use std::time::{UNIX_EPOCH, SystemTime};
use anyhow::{Result, Error};

#[tokio::main]
async fn main() -> Result<()> {
    let task1 = tokio::spawn(async move{
        loop{
            sleep(Duration::from_millis(3)).await;
            let now = SystemTime::now()
                        .duration_since(UNIX_EPOCH)?
                        .as_millis();
            if now as u128 % 9 == 0 {
                //on >2 iterations here,
                //if "task2" is running
                //task2 should be terminated
                //and the block below executed again
                let task2 = tokio::spawn(async move{
                    println!("abc");
                    println!("cde");
                    println!("erg");
                });
                //how to join task 2 here 
                //so that task1 isn't blocked?
            };
        }; Ok::<(), Error>(())
    });
    tokio::join![task1];
    Ok(())
}

[ PLAYGROUND ]

I need to run 2 or more threads:

  • [controller thread] loop{} a 'timer' thread that's controlling the flow of my program,
  • [helper thread] operates on the data that thread 1 provides, singular execution or loop{}

If task1 conditions are met, data has changed and I need to abort task2. There is no need for it to operate on the old data, instead it has to be restarted.

I've found solutions such as: task.abort(); however, this could make my code a total mess, but I've used it in my Playground example nonetheless (can't figure out how to implement it there without task2 being external to task1).

Ideally, I'd love to somehow declare task2 outside of the body of task1 with a handle specifying whether an instance is currently running, and control it based on that.

Still, this is already 3-4 levels of indentation before I even started typing out any code and it bothers me.

My inquiries:

  • is there any method allowing for a declaration of a block to spawn that's reusable so that a restart is less messy?
  • is there a direct method or set thereof allowing for a smooth restarting threads in a program's architecture?

What's required is .abort() running same thread from start.

CodePudding user response:

There is two ways of cancelling a task:

  • gracefully, via something like a CancellationToken
  • forcefully, via .abort().

Here is one possible solution for a graceful cancellation:

use anyhow::Result;
use tokio;
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

async fn task2(value: u32, cancellation_token: CancellationToken) {
    'outer: loop {
        for msg in ["abc", "cde", "erg"] {
            println!("task2: {} {}", msg, value);

            tokio::select! {
                _ = sleep(Duration::from_millis(500)) => {},
                _ = cancellation_token.cancelled() => break 'outer,
            };
        }
    }
}

async fn task1() -> Result<()> {
    let mut value = 42;
    let mut task2_cancellation_token = CancellationToken::new();
    let mut task2_handle = tokio::spawn(task2(value, task2_cancellation_token.clone()));

    let mut iteration = 0;
    loop {
        sleep(Duration::from_millis(300)).await;
        iteration  = 1;
        if iteration % 9 == 0 {
            println!("Cancelling task2 ...");
            task2_cancellation_token.cancel();
            println!("Waiting for task2 to finish ...");
            task2_handle.await?;

            println!("Updating value ...");
            value  = 1;

            println!("Restarting task2 ...");
            task2_cancellation_token = CancellationToken::new();
            task2_handle = tokio::spawn(task2(value, task2_cancellation_token.clone()));
        };
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    task1().await
}
task2: abc 42
task2: cde 42
task2: erg 42
task2: abc 42
task2: cde 42
task2: erg 42
Cancelling task2 ...
Waiting for task2 to finish ...
Updating value ...
Restarting task2 ...
task2: abc 43
task2: cde 43
task2: erg 43
...

Alternatively, this is forcefully via .abort():

use anyhow::Result;
use tokio;
use tokio::time::{sleep, Duration};

async fn task2(value: u32) {
    loop {
        for msg in ["abc", "cde", "erg"] {
            println!("task2: {} {}", msg, value);
            sleep(Duration::from_millis(500)).await;
        }
    }
}

async fn task1() -> Result<()> {
    let mut value = 42;
    let mut task2_handle = tokio::spawn(task2(value));

    let mut iteration = 0;
    loop {
        sleep(Duration::from_millis(300)).await;
        iteration  = 1;
        if iteration % 9 == 0 {
            println!("Aborting task2 ...");
            task2_handle.abort();
            // Ignore return value, as it will most likely
            // be a cancelled `JoinError`.
            task2_handle.await.ok();

            println!("Updating value ...");
            value  = 1;

            println!("Restarting task2 ...");
            task2_handle = tokio::spawn(task2(value));
        };
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    task1().await
}
task2: abc 42
task2: cde 42
task2: erg 42
task2: abc 42
task2: cde 42
task2: erg 42
Aborting task2 ...
Updating value ...
Restarting task2 ...
task2: abc 43
task2: cde 43
task2: erg 43
...

Advantages and disadvantages

Graceful cancellation is harder to implement correctly. So if your task permits it, abort() is easier and doesn't require any additional code in the cancelled task.

That said, the task has no way to react to abort(). If your task can only be cancelled in specific states, or it has to run some shutdown functionality on cancellation, a graceful shutdown is the only possibility.

  • Related