Home > Blockchain >  Max values for each key RDD
Max values for each key RDD

Time:07-22

I have this data and want to get each key's max value. The key will be the first element (9,14,26).

(('14', '51600', 'Fashion Week'), 1)
(('9', '61577', 'Guiding Light'), 7)
(('9', '6856', 'Adlina Marie'), 22)
(('14', '120850', 'People Say (feat. Redman)'), 5)
(('26', '155571', "Thinking 'Bout You"), 30)
(('26', '156532', "Hello"), 8)

The final format will be:

 (9, '6856', 'Adlina Marie', 22)
(14, '120850', 'People Say (feat. Redman)', 5)
(26, '155571', "Thinking 'Bout You", 30)

How to select the first column as the key and the last as the value to find the maximum of the value? I tried

groupbykey(lambda x:int(x[0][0])).mapValues(lambda x: max(x))

but it takes the second column as the value to find the max.

CodePudding user response:

You could use map before the aggregations and after:

rdd = rdd.map(lambda x: (x[0][0],(x[1], x[0][1], x[0][2])))
rdd = rdd.groupByKey().mapValues(max)
rdd = rdd.map(lambda x: (x[0], x[1][1], x[1][2], x[1][0]))

Full example:

sc = spark.sparkContext
data = [(('14', '51600', 'Fashion Week'), 1),
        (('9', '61577', 'Guiding Light'), 7),
        (('9', '6856', 'Adlina Marie'), 22),
        (('14', '120850', 'People Say (feat. Redman)'), 5),
        (('26', '155571', "Thinking 'Bout You"), 30),
        (('26', '156532', "Hello"), 8)]
rdd = sc.parallelize(data)

rdd = rdd.map(lambda x: (x[0][0],(x[1], x[0][1], x[0][2])))
print(rdd.collect())
# [('14', (1, '51600', 'Fashion Week')), ('9', (7, '61577', 'Guiding Light')), ('9', (22, '6856', 'Adlina Marie')), ('14', (5, '120850', 'People Say (feat. Redman)')), ('26', (30, '155571', "Thinking 'Bout You")), ('26', (8, '156532', 'Hello'))]

rdd = rdd.groupByKey().mapValues(max)
print(rdd.collect())
# [('14', (5, '120850', 'People Say (feat. Redman)')), ('9', (22, '6856', 'Adlina Marie')), ('26', (30, '155571', "Thinking 'Bout You"))]

rdd = rdd.map(lambda x: (x[0], x[1][1], x[1][2], x[1][0]))
print(rdd.collect())
# [('14', '120850', 'People Say (feat. Redman)', 5), ('9', '6856', 'Adlina Marie', 22), ('26', '155571', "Thinking 'Bout You", 30)]

CodePudding user response:

If working with rdds is not a restriction, here is another approach using a spark df with a window function:

df = spark.createDataFrame(
    [
    (('14', '51600', 'Fashion Week'), 1)
    ,(('9', '61577', 'Guiding Light'), 7)
    ,(('9', '6856', 'Adlina Marie'), 22)
    ,(('14', '120850', 'People Say (feat. Redman)'), 5)
    ,(('26', '155571', "Thinking 'Bout You"), 30)
    ,(('26', '156532', "Hello"), 8)
    ],['key','value']
)

from pyspark.sql import functions as F
from pyspark.sql import Window

df\
    .select(F.col('key._1').alias('key_1'),
            F.col('key._2').alias('key_2'),
            F.col('key._3').alias('key_3'),
            F.col('value'))\
    .withColumn('max', F.max(F.col('value')).over(Window.partitionBy('key_1')))\
    .filter(F.col('value')==F.col('max'))\
    .select('key_1', 'key_2', 'key_3', 'value')\
    .show()

 ----- ------ -------------------- ----- 
|key_1| key_2|               key_3|value|
 ----- ------ -------------------- ----- 
|   14|120850|People Say (feat....|    5|
|   26|155571|  Thinking 'Bout You|   30|
|    9|  6856|        Adlina Marie|   22|
 ----- ------ -------------------- ----- 
  • Related