I have an RDD
which is of type (String,Iterable[GenericData.Record])
. Now I want to save these iterables to paths based on the keys of this RDD. So for example if the RDD contained
("a",[1,2,3,4])
("b",[5,6,7,9])
I need to perist [1,2,3,4] under result-path/a
and [5,6,7,8,9] under result-path/b
. One way to do this - which compiles but fails on runtime - is the following:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,collection) =>
val reRDD = sc.makeRDD(collection)
reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
The problem here is that I cannot do this since SparkContext is not serializable.re
is an RDD
so invoking foreach
on it must serialize the inner lambda and send over to the worker nodes. So I am trying to think of a way where I can convert the initial re
into a Map[(String,RDD[GenericData.Record])]
so that I can do the following:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,rddCollection) =>
rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
CodePudding user response:
Keys can be collected, and original RDD filtered for each key:
val re = rdd
.keys
.collect()
.map(v => v -> rdd.filter(_._1 == v).values)
.toMap