Home > Blockchain >  How to wait for tokio tasks to finish?
How to wait for tokio tasks to finish?

Time:11-30

I am trying to write to a HashMap using the Arc<Mutex<T>> pattern as part of a website scraping exercise inspired from the Rust cookbook.

This first part uses tokio runtime. I cannot get past the tasks being completed and returning the HashMap as it just hangs.

type Db = Arc<Mutex<HashMap<String, bool>>>;

pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> {
    let links = NodeUrl::new("https://www.inverness-courier.co.uk/")
        .await
        .unwrap();

    let arc = db.clone();

    let mut handles = Vec::new();

    for link in links.links_with_paths {
        let x = arc.clone();
        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    //  for handle in handles {
    //     handle.await.expect("Task panicked!");
    //  } < I tried this as well>

    futures::future::join_all(handles).await;

    let readables = arc.lock().await;

    for (key, value) in readables.clone().into_iter() {
        println!("Checking db: k, v ==>{} / {}", key, value);
    }

    let clone_db = readables.clone();

    return Ok(clone_db);
}

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
        db.insert(url.to_string(), true);
    } else {
        db.insert(url.to_string(), false);
    }
}

async fn check_link(url: &Url) -> BoxResult<bool> {
    let res = reqwest::get(url.as_ref()).await?;
    Ok(res.status() != StatusCode::NOT_FOUND)
}

pub struct NodeUrl {
    domain: String,
    pub links_with_paths: Vec<Url>,
}

#[tokio::main]
async fn main() {
    let db: Db = Arc::new(Mutex::new(HashMap::new()));

    let db = futures::executor::block_on(task::handle_async_tasks(db));
}

I would like to return the HashMap to main() where the main thread is blocked. How can I wait for all async threaded processes to be complete and return the HashMap?

CodePudding user response:

let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap();

This doesn't seem like a valid URL to me.

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         db.insert(url.to_string(), true);
    } else {
         db.insert(url.to_string(), false);
    }
}

This is highly problematic. You hold the exclusive lock on the database during the entire request. This makes your application effectively serial. The default timeout in reqwest is 30 seconds. So if the server isn't responsive and you have a lot of links to go through the program might just seem to 'hang'.

You should only get the database lock for as short as possible - just to do the insert:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         let mut db = db.lock().await;
         db.insert(url.to_string(), true);
    } else {
         let mut db = db.lock().await;
         db.insert(url.to_string(), false);
    }
}

Or even better, eliminating the useless if:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);
    let valid = check_link(&url).await.is_ok();
    db.lock().await.insert(url.to_string(), valid);
}

Finally you didn't show your main function, the way you invoke handle_async_tasks or have other stuff running might be problematic.

CodePudding user response:

My main issue was how to handle the MutexGuard - which I did in the end by using clone and returning the inner value.

There was no need to use an futures::executor in main: since we are within the tokio runtime, calling .await was sufficient to access the final value synchronously.

Cloning the Arc once was enough; I had cloned it twice before passing it into the multi-threaded context.

Thanks to @orlp for pointing out bad logic where it concerned the check_link function.

pub async fn handle_async_tasks() -> BoxResult<HashMap<String, bool>> {
    let get_links = NodeUrl::new("https://www.invernesscourier.co.uk/")
        .await
        .unwrap();

    let db: Db = Arc::new(Mutex::new(HashMap::new()));
    let mut handles = Vec::new();

    for link in get_links.links_with_paths {
        let x = db.clone();

        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    futures::future::join_all(handles).await;

    let guard = db.lock().await;
    let cloned = guard.clone();

    Ok(cloned)
}

#[tokio::main]
async fn main() {
    let db = task::handle_async_tasks().await.unwrap();
    for (key, value) in db.into_iter() {
        println!("Checking db: {} / {}", key, value);
    }
}

This is by no means the best Rust code, but I wanted to share how I tackled things in the end.

  • Related