Is it possible to both read a stream of Future
s from a set of JoinHandle<()>
tasks and update that set of tasks with new tasks at the same time?
I currently have a Service
that runs some long tasks. Only thing is, I would actually like to (if possible) add new tasks in at the same time -- via a flag sent by some type of Receiver
channel (not shown below to keep things simple).
Given that in Service::run
handles
becomes owned by that function, I would lean towards "no", this is not possible. Is this true? If this isn't possible given my setup, is there some way I could tweak the code below to make this possible?
I read in this answer that wrapping HashMap
in an Option
allows me to use .take()
in Service::run
since the value needs to be owned in order to call .into_values()
. However, problem with this is that .take()
consumes the value in the Mutex
, leaving None
in its wake.
Here is my minimal reproducible example (did not compile this, but should give the idea):
use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
type Handles = Arc<Mutex<Option<HashMap<String, JoinHandle<()>>>>>;
fn a_task() -> impl Future<Output = ()> {
async move {
sleep(Duration::from_secs(3)).await;
}
}
fn the_update_task(handles: Handles) -> impl Future<Output = ()> {
async move {
// would like to update `handles` here as I get new data from a channel
// calling .take() in Service::run nukes my handles here :(
}
}
struct Service {
handles: Handles,
}
impl Service {
fn new() -> Self {
let handles = Arc::new(Mutex::new(Some(HashMap::default())));
let handle = tokio::spawn(the_update_task(handles.clone());
Self { handles }
}
async fn add_a_task(&mut self, id: String) {
let handle = tokio::spawn(a_task());
self.handles.lock().await.as_mut().unwrap().insert(id, handle);
}
async fn run(self) {
let Service { handles, .. } = self;
let mut futs = FuturesUnordered::from_iter(
handles.lock().await.take().unwrap().into_values()
);
while let Some(fut) = futs.next().await {
info!("I completed a task! fut:?}");
}
}
}
#[tokio::main]
async fn main() {
let mut srvc = Service::new();
srvc.add_task("1".to_string()).await;
srvc.add_task("2".to_string()).await;
let handle = tokio::spawn(srv.run());
handle.await;
}
I have tried
- Using
Arc(Mutex(HashMap))
- Using
Arc(Mutex(Option(HashMap)))
I seem to arrive always at the same conclusion:
- I cannot both own
handles
inService::run
and updatehandles
(even a copy/reference) from other part of the code
CodePudding user response:
Just answering my own question here with the help of @user1937198's comment.
The solution was to update a reference to the FuturesUnordered
directly with new tasks, as opposed to being concerned with handles
. This simplifies things quite a bit.
use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
fn a_task() -> impl Future<Output = ()> {
async move {
sleep(Duration::from_secs(3)).await;
}
}
fn the_update_task(futs: Arc<Mutex<FuturesUnordered>>) -> impl Future<Output = ()> {
async move {
// Just push another task
let fut = tokio::spawn(a_task());
futs.lock().await.push(fut);
}
}
struct Service {
handles: HashMap<String, JoinHandle<()>>,
}
impl Service {
fn new() -> Self {
let handles = HashMap::default();
Self { handles }
}
async fn add_a_task(&mut self, id: String) {
let handle = tokio::spawn(a_task());
self.handles.insert(id, handle);
}
async fn run(self) {
let Service { handles, .. } = self;
let futs = Arc::new(Mutex::new(FuturesUnordered::from_iter(handles.into_values())));
tokio::spawn(the_update_task(futs.clone())).await.unwrap();
while let Some(fut) = futs.lock().await.next().await {
info!("I completed a task! fut:?}");
}
}
}
#[tokio::main]
async fn main() {
let mut srvc = Service::new();
srvc.add_task("1".to_string()).await;
srvc.add_task("2".to_string()).await;
let handle = tokio::spawn(srv.run());
handle.await;
}