Home > Software design >  How can I get the right RDD map output, when I iterate over a broadcast variable inside the RDD?
How can I get the right RDD map output, when I iterate over a broadcast variable inside the RDD?

Time:12-04

My code looks like this:

val conf = new SparkConf().setAppName("Sample").setMaster("local")
val sc = new SparkContext(conf)

val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,3),(3,4))
val rdd2= RDD[(Int, Int)] = sc.parallelize(Seq((2,1),(3,2),(7,6))

val rdd2AsMap = rdd2.collectAsMap.toMap
val broadcastMap = sc.broadcast(rdd2AsMap)

val result = rdd1.map{case(x,y) => {
     for((key,value) <- broadcastMap .value) {
          (x,key)
     }
}}

result.saveAsTextFile("file:///home/cjohnson/output")

The expected output written to file should be:

(1,2)
(1,3)
(1,7)
(2,2)
(2,3)
(2,7)
(3,2)
(3,3)
(3,7)

But I get this output written to file:

()
()
()

How can I fix this?

P.S. This is just some small sample data I provided, to demonstrate my issue. The actual data is much larger.

CodePudding user response:

  1. That inner for returns Unit i.e. () because you forgot to add yield:
val a = for((key, _) <- Map(1 -> "")) yield { (key) }
  1. You need flatMap instead of map to make that product between each rdd key and broadcast map key.

Regarding your question, here is how i would approach it:

rdd1
  .keys
  .flatMap { rddKey =>
    broadcastMap
      .value
      .keys
      .map(broadcastKey => (rddKey, broadcastKey))
}

Later edit:

It could be written as a cartesian

rdd1
  .keys
  .cartesian(rdd2.keys)
  • Related