Home > Mobile >  RDD[(String,Iterable[GenericData.Record])] to Map[(String,RDD[GenericData.Record])]
RDD[(String,Iterable[GenericData.Record])] to Map[(String,RDD[GenericData.Record])]

Time:05-18

I have an RDD which is of type (String,Iterable[GenericData.Record]). Now I want to save these iterables to paths based on the keys of this RDD. So for example if the RDD contained

("a",[1,2,3,4])
("b",[5,6,7,9])

I need to perist [1,2,3,4] under result-path/a and [5,6,7,8,9] under result-path/b. One way to do this - which compiles but fails on runtime - is the following:

implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)

re.forearch {
   case (key,collection) =>
       val reRDD = sc.makeRDD(collection)
       reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
        classOf[SpecificRecord],
        classOf[NullWritable],
        classOf[AvroKeyOutputFormat[SpecificRecord]],
        hadoopConf)

}

The problem here is that I cannot do this since SparkContext is not serializable.re is an RDD so invoking foreach on it must serialize the inner lambda and send over to the worker nodes. So I am trying to think of a way where I can convert the initial re into a Map[(String,RDD[GenericData.Record])] so that I can do the following:

implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)

re.forearch {
   case (key,rddCollection) =>
       rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
        classOf[SpecificRecord],
        classOf[NullWritable],
        classOf[AvroKeyOutputFormat[SpecificRecord]],
        hadoopConf)

}

CodePudding user response:

Keys can be collected, and original RDD filtered for each key:

val re = rdd
  .keys
  .collect()
  .map(v => v -> rdd.filter(_._1 == v).values)
  .toMap
  • Related