Home > database >  Async move closure vs. fold
Async move closure vs. fold

Time:11-10

Async closures are still unstable in Rust, as pointed out in the related question What is the difference between |_| async move {} and async move |_| {}, the answer to which I do not really understand.

As far as I do understand, the following is not an async closure:

let mut sum: i32 = 0;
stream::iter(1..25)
    .map(compute)
    .buffered(12)
    .for_each(|result| async move { sum =result; })
    .await;
println!("->{}", sum);

I was baffled by this, initially: sum is used in the for_each, but it is not moved, otherwise the println! would produce a compiler error. The compiler gives a warning, though, that the "value assigned to sum is never read". But, in fact, sum is copied.

Here is the complete example code

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute(i: i32) -> i32 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..1000);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
    println!("#{} done", i);
    i * i
}

async fn sum_with_fold() {
    let sum = stream::iter(1..25)
        .map(compute)
        .buffered(12)
        .fold(0, |sum,x| async move {sum x} )
        .await;
    println!("->{}", sum);
}

async fn sum_with_closure() {
    let mut sum: i32 = 0;
    stream::iter(1..25)
        .map(compute)
        .buffered(12)
        .for_each(|result| async move { sum =result; })
        .await;
    println!("->{}", sum);
}

#[tokio::main]
async fn main() {
    sum_with_fold().await;
    sum_with_closure().await;
}

// Cargo.toml:
// [dependencies]
// futures = "0.3"
// rand = "0.8"
// tokio = { version = "1", features = ["full"] }

The fold works correctly, whereas sum_with_closure works on a copied sum and this sum cannot be retrieved.

Am I getting this right, and can it be fixed? I.e. is there a way to do the fold with a closure like this? Or am I indeed running into the unstable async closure feature?

CodePudding user response:

This can be done with for_each, but current Rust can't check at compile-time that the lifetimes are correct so you need to use a RefCell to enable run-time checking (with a small performance cost):

async fn sum_with_closure() {
    use std::cell::RefCell;

    let sum = RefCell::new (0);
    let sumref = ∑
    stream::iter(1..25)
        .map(compute)
        .buffered(12)
        .for_each(|result| async move { *sumref.borrow_mut()  =result; })
        .await;
    println!("->{}", sum.into_inner());
}

Playground

  • Related