How can you view the result of RDD.join() in Scala?


I am trying to compute the result of a PageRanking algorithm, where the scoring function is the number of outgoing links on a page.

val links = warcs.map{ wr => wr._2.getRecord()}.
               map{ wb => {
                        val url = wb.getHeader().getUrl()
                        val d = Jsoup.parse(wb.getHttpStringBody())
                        val links = d.select("a").asScala
                        links.map(l => (url,l.attr("href"))).toIterator
                flatMap(identity).map(t => (t._1,List(t._2))).reduceByKey(_:::_)

var ranks = warcs.map{ wr => wr._2.getRecord()}.
                  map{ wb => (wb.getHeader().getUrl(), Jsoup.parse(wb.getHttpStringBody()).select("a[href]").size())}.
                  filter{ l => l._2 > 0}

The links RDD is of the form (URL, list of outgoing URLs) and ranks is of the form (URL, number of outgoing URLs).

This is what the pageranking looks like:

for(i <- 1 to 10){
    val contribs = links.join(ranks).flatMap { case (url, (links, rank)) => links.map(dest => (dest, rank/links.size)) }

    ranks = contribs.reduceByKey((x,y) => x y).mapValues(sum => (0.15   0.85*sum).toInt)

This being said, when I try to check the results of the ranking algorithm, I am met with an IndexOutOfBoundsException. I tried seeing if the resulting RDD is empty by printing ranks.isEmpty() and I get the same exception.

I have tried out of curiosity to see the result of links.join(ranks), but the same exception once again occurs.

What is going wrong with the join() operation, and how can I progress?

Turns out the problem was in my creation of the WARC files that I was using,

val warcs = sc.newAPIHadoopFile(
              classOf[WarcGzInputFormat],             // InputFormat
              classOf[NullWritable],                  // Key
              classOf[WarcWritable]                   // Value

Turns out removing .cache() stops the exceptions. I don't know why though, so an explanation would still be welcome.

