My code looks like this:
val conf = new SparkConf().setAppName("Sample").setMaster("local")
val sc = new SparkContext(conf)
val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,3),(3,4))
val rdd2= RDD[(Int, Int)] = sc.parallelize(Seq((2,1),(3,2),(7,6))
val rdd2AsMap = rdd2.collectAsMap.toMap
val broadcastMap = sc.broadcast(rdd2AsMap)
val result = rdd1.map{case(x,y) => {
for((key,value) <- broadcastMap .value) {
(x,key)
}
}}
result.saveAsTextFile("file:///home/cjohnson/output")
The expected output written to file should be:
(1,2)
(1,3)
(1,7)
(2,2)
(2,3)
(2,7)
(3,2)
(3,3)
(3,7)
But I get this output written to file:
()
()
()
How can I fix this?
P.S. This is just some small sample data I provided, to demonstrate my issue. The actual data is much larger.
CodePudding user response:
- That inner
for
returns Unit i.e.()
because you forgot to addyield
:
val a = for((key, _) <- Map(1 -> "")) yield { (key) }
- You need
flatMap
instead ofmap
to make that product between each rdd key and broadcast map key.
Regarding your question, here is how i would approach it:
rdd1
.keys
.flatMap { rddKey =>
broadcastMap
.value
.keys
.map(broadcastKey => (rddKey, broadcastKey))
}
Later edit:
It could be written as a cartesian
rdd1
.keys
.cartesian(rdd2.keys)