I'm trying to write a web crawler in Rust using the tokio asynchronous runtime. I want to fetch/process multiple pages asynchronously but I also want the crawler to stop when it reaches the end (in other words if there is nothing left to crawl). So far I have used futures::future::try_join_all for getting a collective result from the async functions that I have provide as Future
s but this obviously requires the program to know the total pages to crawl beforehand. For example:
async fn fetch(_url: String) -> Result<String, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(String::from("foo"))
}
#[tokio::main]
async fn main() {
let search_url = "https://example.com/?page={page_num}";
let futures = (1..=3)
.map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
.map(|url| fetch(url));
let _ = futures::future::try_join_all(futures).await.unwrap();
}
In this simple example I have to know the total pages to go through (1..=3
) before actually fetching them. What I want is, not providing any range and have a condition to stop the whole process. (e.g. if the HTML result contains "not found")
I looked into futures::executor::block_on but I'm not sure if it is something that I can utilize for this task.
CodePudding user response:
Here's roughly how to do this using Stream
and .buffered()
:
use futures::{future, stream, StreamExt};
#[derive(Debug)]
struct Error;
async fn fetch_page(page: i32) -> Result<String, Error> {
println!("fetching page: {}", page);
// simulate loading pages
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if page < 5 {
// successfully got page
Ok(String::from("foo"))
} else {
// page doesn't exist
Err(Error)
}
}
#[tokio::main]
async fn main() {
let pages: Vec<String> = stream::iter(1..)
.map(fetch_page)
.buffered(10)
.take_while(|page| future::ready(page.is_ok()))
.map(|page| page.unwrap())
.collect()
.await;
println!("pages: {:?}", pages);
}
I'll go over the steps in main()
in detail:
stream::iter(1..)
creates an unboundedStream
of integers representing each page number.map(fetch_page)
of course will callfetch_page
for each page number.buffered(10)
this will allow up to 10fetch_page
calls to occur concurrently and will preserve the original order.take_while(|page| future::ready(page.is_ok()))
will keep the stream going until afetch_page
returns an error, it usesfutures::future::ready
since the function passed totake_while
must return a future.map(|page| page.unwrap())
will pull out the successful pages, it won't panic because we know the stream will stop when any errors occur.collect()
does essentially the same thing as for an iterator except you have to.await
it
Running the above code prints out the following, showing that it tries 10 at a time but will only return up to the first failure:
fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]
This glosses over some nice-to-haves like handling non-missing-page errors or retrying, but I hope this gives you a good foundation. In those cases you might reach for the methods on TryStreamExt
, which specially handle streams of Result
s.