I have spark DataFrame with two columns
colA colB
1 3
1 2
2 4
2 5
2 1
I want to groupBy colA and iterate over colB list for each group such that:
res = 0
for i in collect_list(col("colB")):
res = i * (3 res)
returned value shall be res
so I get:
colA colB
1 24
2 78
how can i do this in scala?
CodePudding user response:
You can achieve the result you want with the following:
val df = Seq((1,3), (1,2), (2,4), (2,5), (2,1)).toDF("colA", "colB")
val retDf = df
.groupBy("colA")
.agg(
aggregate(
collect_list("colB"), lit(0), (acc, nxt) => nxt * (acc 3)
) as "colB")
You need to be very careful with this however, as data on Spark is distributed. If the data has been shuffled since being read into Spark there is no guarantee that it will be collected in the same order. In the toy example collect_list("colB")
will return Seq(3,2)
where colA
is 1
. If there had been any shuffles at an earlier phase however, collect_list
can just as well return Seq(2,3)
which would give you 27
instead of the desired 24
. You need to provide some metadata to your data which you can use to ensure you're processing this in the order you expect such as with the monotonicallyIncreasingId
method.
CodePudding user response:
RDD approach with no loss of ordering.
%scala
val rdd1=spark.sparkContext.parallelize(Seq((1,3), (1,3), (2,4), (2,5), (2,1))).zipWithIndex().map(x => ((x._1._1), (x._1._2, x._2)) )
val rdd2 = rdd1.groupByKey
// Convert to Array.
val rdd3 = rdd2.map(x => (x._1, x._2.toArray))
val rdd4 = rdd3.map(x => (x._1, x._2.sortBy(_._2)))
val rdd5 = rdd4.mapValues(v => v.map(_._1))
rdd5.collect()
val res = rdd5.map(x => (x._1, x._2.fold(0)((acc, nxt) => nxt * (acc 3) )))
res.collect()
returns:
res201: Array[(Int, Int)] = Array((1,24), (2,78))