I am trying to compute the result of a PageRanking algorithm, where the scoring function is the number of outgoing links on a page.
val links = warcs.map{ wr => wr._2.getRecord()}.
map{ wb => {
val url = wb.getHeader().getUrl()
val d = Jsoup.parse(wb.getHttpStringBody())
val links = d.select("a").asScala
links.map(l => (url,l.attr("href"))).toIterator
}
}.
flatMap(identity).map(t => (t._1,List(t._2))).reduceByKey(_:::_)
var ranks = warcs.map{ wr => wr._2.getRecord()}.
map{ wb => (wb.getHeader().getUrl(), Jsoup.parse(wb.getHttpStringBody()).select("a[href]").size())}.
filter{ l => l._2 > 0}
The links RDD is of the form (URL, list of outgoing URLs) and ranks is of the form (URL, number of outgoing URLs).
This is what the pageranking looks like:
for(i <- 1 to 10){
val contribs = links.join(ranks).flatMap { case (url, (links, rank)) => links.map(dest => (dest, rank/links.size)) }
ranks = contribs.reduceByKey((x,y) => x y).mapValues(sum => (0.15 0.85*sum).toInt)
}
This being said, when I try to check the results of the ranking algorithm, I am met with an IndexOutOfBoundsException. I tried seeing if the resulting RDD is empty by printing ranks.isEmpty()
and I get the same exception.
I have tried out of curiosity to see the result of links.join(ranks)
, but the same exception once again occurs.
What is going wrong with the join() operation, and how can I progress?
CodePudding user response:
Turns out the problem was in my creation of the WARC files that I was using,
val warcs = sc.newAPIHadoopFile(
warcfile,
classOf[WarcGzInputFormat], // InputFormat
classOf[NullWritable], // Key
classOf[WarcWritable] // Value
).cache()
Turns out removing .cache()
stops the exceptions. I don't know why though, so an explanation would still be welcome.