Home > OS >  how to get some element from each group after use groupby in spark
how to get some element from each group after use groupby in spark

Time:02-17

I have a spark rdd data, let's suppose it has 1000 elements and can be grouped into 10 groups, what I want to do is select 2 element which meets my special requirement in each group. And then, get a new rdd with 20 elements.

suppose the rdd data is like

((1,a1),
(1,a2),
(1,a3),
...
(1,a100),
(2,b1),
(2,b2),
(2,b3)
...
(2,b100))

what i want is

((1,a1),
(1,a99),
(2,b1),
(2,b99)
)

and I select a1、a99、b1、b99 with a function called my_func

I think the code may be something like:

myrdd.groupby(x => x._1)....(my_func)...

CodePudding user response:

Not convinced you need groupBy. Not sure of structure of RDD.

This using my own contrived data, so you will need to adapt:

// Gen some data. My data. Adapt to yours.
val rdd = spark.sparkContext.parallelize(Seq((1, "x"), (2, "y"),  (3, "z"), (4, "z"), (5, "bbb") ))

// Compare list.
val l = List("x", "y", "z")

// Function to filter, could be inline or via mapPartitions.
def my_function(l: List[String], r: RDD[(Int, String)]) = {
    r.map(x => (x._1, x._2)).filter(x => l.contains(x._2)) 
}

// Run it all.
val rdd2 = my_function(l,rdd)
rdd2.collect

returns:

res24: Array[(Int, String)] = Array((1,x), (2,y), (3,z), (4,z))

CodePudding user response:

I strongly discourage you from using groupBy() or even mapPartitions() for big dataset when you need to subsequently aggregate your data. The purpose of RDD and MapReduce programming model is to distribute computations: computing the max/min/sum etc in the driver or on a single node means using only the HDFS part of Spark.

Besides, there are many ways to perform your task, but focusing on finding a pattern that fits for every type of aggregation you need is just wrong and inevitably make your code inefficient.

Here is a possible PySpark solution for the problem you have:

rdd.reduceByKey(lambda x, y: x if x < y else y)\
    .union(rdd.reduceByKey(lambda x, y: x if x > y else y)).sortByKey().collect()

In the first reduceByKey I find the smallest value for each key and in the second one the biggest value for each key. Then I can union them and, if necessary, sort the resulting RDD to obtain the result you showed us.

  • Related