Home > OS >  scala spark reduce list in groupby
scala spark reduce list in groupby

Time:10-16

I have spark DataFrame with two columns

colA colB
1    3
1    2
2    4
2    5
2    1

I want to groupBy colA and iterate over colB list for each group such that:

res = 0
for i in collect_list(col("colB")):
    res  = i * (3 res)

returned value shall be res

so I get:

colA colB
1    24
2    78

how can i do this in scala?

CodePudding user response:

You can achieve the result you want with the following:


val df = Seq((1,3), (1,2), (2,4), (2,5), (2,1)).toDF("colA", "colB")
val retDf = df
  .groupBy("colA")
  .agg(
    aggregate(
      collect_list("colB"), lit(0), (acc, nxt) => nxt * (acc   3)
    ) as "colB")

You need to be very careful with this however, as data on Spark is distributed. If the data has been shuffled since being read into Spark there is no guarantee that it will be collected in the same order. In the toy example collect_list("colB") will return Seq(3,2) where colA is 1. If there had been any shuffles at an earlier phase however, collect_list can just as well return Seq(2,3) which would give you 27 instead of the desired 24. You need to provide some metadata to your data which you can use to ensure you're processing this in the order you expect such as with the monotonicallyIncreasingId method.

CodePudding user response:

RDD approach with no loss of ordering.

%scala
val rdd1=spark.sparkContext.parallelize(Seq((1,3), (1,3), (2,4), (2,5), (2,1))).zipWithIndex().map(x => ((x._1._1), (x._1._2, x._2)) )
val rdd2 = rdd1.groupByKey
// Convert to Array.
val rdd3 = rdd2.map(x => (x._1, x._2.toArray)) 
val rdd4 = rdd3.map(x => (x._1, x._2.sortBy(_._2)))
val rdd5 = rdd4.mapValues(v => v.map(_._1))
rdd5.collect()

val res = rdd5.map(x => (x._1, x._2.fold(0)((acc, nxt) => nxt * (acc   3) )))
res.collect()

returns:

res201: Array[(Int, Int)] = Array((1,24), (2,78))
  • Related