I am working on a project that has long compute times to which I will have hundreds of nodes running it, as part of my implementation I have a status handler object/struct which talks to the API and gets the needed information like the arguments, the status handler then calls the main intensive function.
It order to keep tabs on the computationally intensive function I would like it to yield back to the status handler function with the completed percentage so the status handler can update the API and then allow the intensive function to continue computation without losing any of its stack (such as it variables and file handles)
I've looked into async function but they seem to only return once.
Thank you in advance!
CodePudding user response:
What you are asking for are generators, which are currently not stable. You can either experiment with them on nightly, or manually create something that works like them and manually call it (though it won't has as nice syntax as generators). Something like this for example:
enum Status<T, K> {
Updated(T),
Finished(K)
}
struct State { /* ... */ }
impl State {
pub fn new() -> Self {
Self { /* ... */ }
}
pub fn call(self) -> Status<Self, ()> {
// Do some update on State and return
// State::Updated(self). When you finised
// return State::Finished(()). Note that this
// method consumes self. You could make it take
// &mut self, but then you would have to worry
// about how to prevent it beeing called after
// it finished. If you want to return some intermidiate
// value in each step you can make Status::Updated
// contain a (state, value) instead.
todo!()
}
}
fn foo() {
let mut state = State::new();
let result = loop {
match state.call() {
Status::Updated(s) => state = s,
Status::Finished(result) => break result
}
};
}
CodePudding user response:
Async can in fact pause and resume, but it is meant for IO bound programs that basically wait for some external IO the entire time. It's not meant for computation heavy tasks.
There two ways that come to my mind on how to solve this problem:
- threads & channels
- callbacks
Solution 1: Threads & Channels
use std::{sync::mpsc, thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation(id: i32, status_update: mpsc::Sender<StatusUpdate>) {
status_update.send(StatusUpdate::new(id, 0.0)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 33.3)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 66.6)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 100.0)).unwrap();
}
fn main() {
let (status_sender_1, status_receiver) = mpsc::channel();
let status_sender_2 = status_sender_1.clone();
thread::spawn(move || expensive_computation(1, status_sender_1));
thread::spawn(move || expensive_computation(2, status_sender_2));
for status_update in status_receiver {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
}
}
Task 1 is done 0 %
Task 2 is done 0 %
Task 1 is done 33.3 %
Task 2 is done 33.3 %
Task 1 is done 66.6 %
Task 2 is done 66.6 %
Task 2 is done 100 %
Task 1 is done 100 %
Solution 2: Callbacks
use std::{thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation<F: FnMut(StatusUpdate)>(id: i32, mut update_status: F) {
update_status(StatusUpdate::new(id, 0.0));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 33.3));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 66.6));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 100.0));
}
fn main() {
expensive_computation(1, |status_update| {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
});
}
Task 1 is done 0 %
Task 1 is done 33.3 %
Task 1 is done 66.6 %
Task 1 is done 100 %
Note that with the channels solution it is much easier to handle multiple computations on different threads at once. With callbacks, communicating between threads is hard/impossible.
am I able to pause execution of the expensive function while I do something with that data then allow it to resume?
No, this is not something that can be done with threads. In general.
You can run them with a lower priority than the main thread, which means they won't be scheduled as aggressively, which decreases the latency in the main thread. But in general, operating systems are preemptive and are capable of switching back and forth between threads, so you shouldn't worry about 'pausing'.
CodePudding user response:
This is what I came up with using tokio
use tokio::task;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(expensive_computation(tx));
while let Some(message) = rx.recv().await {
write_data(message).await;
}
}
async fn expensive_computation(tx: mpsc::Sender<String>) {
for i in 0..100 {
if i % 10 == 0 {
tx.send(format!("{}", i)).await.unwrap();
}
println!("expensive_computation {}", i);
}
}
async fn write_data(i: String) {
println!("Writing data {}", i);
}