Async closures are still unstable in Rust, as pointed out in the related question What is the difference between |_| async move {}
and async move |_| {}
, the answer to which I do not really understand.
As far as I do understand, the following is not an async closure:
let mut sum: i32 = 0;
stream::iter(1..25)
.map(compute)
.buffered(12)
.for_each(|result| async move { sum =result; })
.await;
println!("->{}", sum);
I was baffled by this, initially: sum
is used in the for_each
, but it is not moved, otherwise the println!
would produce a compiler error. The compiler gives a warning, though, that the "value assigned to sum
is never read". But, in fact, sum is copied.
Here is the complete example code
use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;
async fn compute(i: i32) -> i32 {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..1000);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
println!("#{} done", i);
i * i
}
async fn sum_with_fold() {
let sum = stream::iter(1..25)
.map(compute)
.buffered(12)
.fold(0, |sum,x| async move {sum x} )
.await;
println!("->{}", sum);
}
async fn sum_with_closure() {
let mut sum: i32 = 0;
stream::iter(1..25)
.map(compute)
.buffered(12)
.for_each(|result| async move { sum =result; })
.await;
println!("->{}", sum);
}
#[tokio::main]
async fn main() {
sum_with_fold().await;
sum_with_closure().await;
}
// Cargo.toml:
// [dependencies]
// futures = "0.3"
// rand = "0.8"
// tokio = { version = "1", features = ["full"] }
The fold works correctly, whereas sum_with_closure
works on a copied sum
and this sum
cannot be retrieved.
Am I getting this right, and can it be fixed? I.e. is there a way to do the fold with a closure like this? Or am I indeed running into the unstable async closure feature?
CodePudding user response:
This can be done with for_each
, but current Rust can't check at compile-time that the lifetimes are correct so you need to use a RefCell
to enable run-time checking (with a small performance cost):
async fn sum_with_closure() {
use std::cell::RefCell;
let sum = RefCell::new (0);
let sumref = ∑
stream::iter(1..25)
.map(compute)
.buffered(12)
.for_each(|result| async move { *sumref.borrow_mut() =result; })
.await;
println!("->{}", sum.into_inner());
}