Home > Software design >  How to concurrently crawl paginated webpages with unknown end?
How to concurrently crawl paginated webpages with unknown end?

Time:09-26

I'm trying to write a web crawler in Rust using the tokio asynchronous runtime. I want to fetch/process multiple pages asynchronously but I also want the crawler to stop when it reaches the end (in other words if there is nothing left to crawl). So far I have used futures::future::try_join_all for getting a collective result from the async functions that I have provide as Futures but this obviously requires the program to know the total pages to crawl beforehand. For example:

async fn fetch(_url: String) -> Result<String, ()> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    Ok(String::from("foo"))
}

#[tokio::main]
async fn main() {
    let search_url = "https://example.com/?page={page_num}";

    let futures = (1..=3)
        .map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
        .map(|url| fetch(url));

    let _ = futures::future::try_join_all(futures).await.unwrap();
}

Rust Playground

In this simple example I have to know the total pages to go through (1..=3) before actually fetching them. What I want is, not providing any range and have a condition to stop the whole process. (e.g. if the HTML result contains "not found")

I looked into futures::executor::block_on but I'm not sure if it is something that I can utilize for this task.

CodePudding user response:

Here's roughly how to do this using Stream and .buffered():

use futures::{future, stream, StreamExt};

#[derive(Debug)]
struct Error;

async fn fetch_page(page: i32) -> Result<String, Error> {
    println!("fetching page: {}", page);

    // simulate loading pages
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    if page < 5 {
        // successfully got page
        Ok(String::from("foo"))
    } else {
        // page doesn't exist
        Err(Error)
    }
}

#[tokio::main]
async fn main() {
    let pages: Vec<String> = stream::iter(1..)
        .map(fetch_page)
        .buffered(10)
        .take_while(|page| future::ready(page.is_ok()))
        .map(|page| page.unwrap())
        .collect()
        .await;

    println!("pages: {:?}", pages);
}

I'll go over the steps in main() in detail:

Running the above code prints out the following, showing that it tries 10 at a time but will only return up to the first failure:

fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]

This glosses over some nice-to-haves like handling non-missing-page errors or retrying, but I hope this gives you a good foundation. In those cases you might reach for the methods on TryStreamExt, which specially handle streams of Results.

  • Related