Home > Net >  How to await `JoinHandle`s and update `JoinHandle`s at the same time?
How to await `JoinHandle`s and update `JoinHandle`s at the same time?

Time:01-22

Is it possible to both read a stream of Futures from a set of JoinHandle<()> tasks and update that set of tasks with new tasks at the same time?

I currently have a Service that runs some long tasks. Only thing is, I would actually like to (if possible) add new tasks in at the same time -- via a flag sent by some type of Receiver channel (not shown below to keep things simple).

Given that in Service::run handles becomes owned by that function, I would lean towards "no", this is not possible. Is this true? If this isn't possible given my setup, is there some way I could tweak the code below to make this possible?

I read in this answer that wrapping HashMap in an Option allows me to use .take() in Service::run since the value needs to be owned in order to call .into_values(). However, problem with this is that .take() consumes the value in the Mutex, leaving None in its wake.

Here is my minimal reproducible example (did not compile this, but should give the idea):

use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};

type Handles = Arc<Mutex<Option<HashMap<String, JoinHandle<()>>>>>;

fn a_task() -> impl Future<Output = ()> {
  async move {
    sleep(Duration::from_secs(3)).await;
  }
}


fn the_update_task(handles: Handles) -> impl Future<Output = ()> {
  async move {
    // would like to update `handles` here as I get new data from a channel
    // calling .take() in Service::run nukes my handles here :(
  }
}

struct Service {
  handles: Handles,
}

impl Service {
  fn new() -> Self {
    let handles = Arc::new(Mutex::new(Some(HashMap::default())));
    let handle = tokio::spawn(the_update_task(handles.clone());
    Self { handles }
  }

  async fn add_a_task(&mut self, id: String) {
    let handle = tokio::spawn(a_task());
    self.handles.lock().await.as_mut().unwrap().insert(id, handle);
  }

  async fn run(self) {
    let Service { handles, .. } = self;
    let mut futs = FuturesUnordered::from_iter(
       handles.lock().await.take().unwrap().into_values()
    );
    while let Some(fut) = futs.next().await {
      info!("I completed a task! fut:?}");
    }
  }
}

#[tokio::main]
async fn main() {
   let mut srvc = Service::new();
   srvc.add_task("1".to_string()).await;
   srvc.add_task("2".to_string()).await;

   let handle = tokio::spawn(srv.run());
   handle.await;
}

I have tried

  • Using Arc(Mutex(HashMap))
  • Using Arc(Mutex(Option(HashMap)))

I seem to arrive always at the same conclusion:

  • I cannot both own handles in Service::run and update handles (even a copy/reference) from other part of the code

CodePudding user response:

Just answering my own question here with the help of @user1937198's comment.

The solution was to update a reference to the FuturesUnordered directly with new tasks, as opposed to being concerned with handles. This simplifies things quite a bit.

use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};

fn a_task() -> impl Future<Output = ()> {
  async move {
    sleep(Duration::from_secs(3)).await;
  }
}


fn the_update_task(futs: Arc<Mutex<FuturesUnordered>>) -> impl Future<Output = ()> {
  async move {
    // Just push another task
    let fut = tokio::spawn(a_task());
    futs.lock().await.push(fut);
  }
}

struct Service {
  handles: HashMap<String, JoinHandle<()>>,
}

impl Service {
  fn new() -> Self {
    let handles = HashMap::default();
    Self { handles }
  }

  async fn add_a_task(&mut self, id: String) {
    let handle = tokio::spawn(a_task());
    self.handles.insert(id, handle);
  }

  async fn run(self) {
    let Service { handles, .. } = self;
    let futs = Arc::new(Mutex::new(FuturesUnordered::from_iter(handles.into_values())));

    tokio::spawn(the_update_task(futs.clone())).await.unwrap();

    while let Some(fut) = futs.lock().await.next().await {
      info!("I completed a task! fut:?}");
    }
  }
}

#[tokio::main]
async fn main() {
   let mut srvc = Service::new();
   srvc.add_task("1".to_string()).await;
   srvc.add_task("2".to_string()).await;

   let handle = tokio::spawn(srv.run());
   handle.await;
}
  • Related