Home > Software engineering >  Pyspark display max value(S) and multiple sorting
Pyspark display max value(S) and multiple sorting

Time:12-01

Grateful for some help here. Using Pyspark (cannot use SQL please). So I have a list of tuples stored as RDD Pairs:

[(('City1', '2020-03-27', 'X1'), 44),

(('City1', '2020-03-28', 'X1'), 44),

(('City3', '2020-03-28', 'X3'), 15),

(('City4', '2020-03-27', 'X4'), 5),

(('City4', '2020-03-26', 'X4'), 4),

(('City2', '2020-03-26', 'X2'), 14),

(('City2', '2020-03-25', 'X2'), 4),

(('City4', '2020-03-25', 'X4'), 1),

(('City1', '2020-03-29', 'X1'), 1),

(('City5', '2020-03-25', 'X5'), 15)]

With for example ('City5', '2020-03-25', 'X5') as the Key, and 15 as the value of the last pair.

I would like to obtain the following outcome:

City1, X1, 2020-03-27, 44

City1, X1, 2020-03-28, 44

City5, X3, 2020-03-25, 15

City3, X3, 2020-03-28, 15

City2, X2, 2020-03-26, 14

City4, X4, 2020-03-27, 5

Please notice that the outcome displays:

  • The Key(s) with the max value for each city (That's the hardest part, to display same city twice if they have similar max(values) in different dates, I'm assuming cannot use ReduceByKey() as Key is not unique, maybe GroupBy() or Filter() ?

  • In the following sequencing of order/sorting:

  1. Descending largest value
  2. Ascending date
  3. Descending city name (ex: City1)

So I have tried the following code:

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

Although it does give me the cities with the max value, it doesn't return the same city twice (because reduced by Key: City) if 2 cities has similar max value in 2 different dates.

I hope its clear enough, any precision please ask! Thanks so much!

CodePudding user response:

To keep all cities with value equals to max value, you can still use reduceByKey but over arrays instead of over values:

  • you transform your rows into key/value, with value being an array of tuple instead of a tuple
  • you reduce by key, merging arrays if they contain the same value, else keeping array that has the max value, with reduceByKey
  • you flatten your value arrays, merging the key with them, with flatMap
  • finally you perform your sort

Complete code would be as follows:

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1   array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[0], y[1], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[1], a[0]))

CodePudding user response:

Can you work/output with Dataframes?

List = [(('City1', '2020-03-27', 'X1'), 44),
        (('City1', '2020-03-28', 'X1'), 44),
        (('City3', '2020-03-28', 'X3'), 15),
        (('City4', '2020-03-27', 'X4'), 5),
        (('City4', '2020-03-26', 'X4'), 4),
        (('City2', '2020-03-26', 'X2'), 14),
        (('City2', '2020-03-25', 'X2'), 4),
        (('City4', '2020-03-25', 'X4'), 1),
        (('City1', '2020-03-29', 'X1'), 1),
        (('City5', '2020-03-25', 'X5'), 15)]

rdd = sc.parallelize(List)

import pyspark.sql.functions as F

df = rdd\
        .toDF()\
        .select('_1.*', F.col('_2').alias('value'))\
        .orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))

df.show(truncate=False)

 ----- ---------- --- ----- 
|_1   |_2        |_3 |value|
 ----- ---------- --- ----- 
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
|City2|2020-03-25|X2 |4    |
|City4|2020-03-26|X4 |4    |
|City4|2020-03-25|X4 |1    |
|City1|2020-03-29|X1 |1    |
 ----- ---------- --- ----- 

CodePudding user response:

You can transform your rdd to a dataframe and then use a Spark's window to get max value for each city, filter rows using this value and finally order your dataframe as you wish:

from pyspark.sql import functions as F
from pyspark.sql import Window

window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = rdd.toDF().select(
 F.col('_1._1').alias('city'),
 F.col('_1._2').alias('date'),
 F.col('_1._3').alias('key'),
 F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))\
 .filter(F.col('value') == F.col('max_value'))\
 .drop('max_value')\
 .orderBy(F.desc('value'), F.asc('date'), F.asc('city'))

And you get the following dataframe with your input rdd:

 ----- ---------- --- ----- 
|city |date      |key|value|
 ----- ---------- --- ----- 
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
 ----- ---------- --- ----- 

If you need a RDD at the end of process, you can retrieve it using .rdd method:

df.rdd
  • Related