Grateful for some help here. Using Pyspark (cannot use SQL please). So I have a list of tuples stored as RDD Pairs:
[(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
With for example ('City5', '2020-03-25', 'X5') as the Key, and 15 as the value of the last pair.
I would like to obtain the following outcome:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X3, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
Please notice that the outcome displays:
The Key(s) with the max value for each city (That's the hardest part, to display same city twice if they have similar max(values) in different dates, I'm assuming cannot use ReduceByKey() as Key is not unique, maybe GroupBy() or Filter() ?
In the following sequencing of order/sorting:
- Descending largest value
- Ascending date
- Descending city name (ex: City1)
So I have tried the following code:
res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
Although it does give me the cities with the max value, it doesn't return the same city twice (because reduced by Key: City) if 2 cities has similar max value in 2 different dates.
I hope its clear enough, any precision please ask! Thanks so much!
CodePudding user response:
To keep all cities with value equals to max value, you can still use reduceByKey
but over arrays instead of over values:
- you transform your rows into key/value, with value being an array of tuple instead of a tuple
- you reduce by key, merging arrays if they contain the same value, else keeping array that has the max value, with
reduceByKey
- you flatten your value arrays, merging the key with them, with
flatMap
- finally you perform your sort
Complete code would be as follows:
def merge(array1, array2):
if array1[0][2] > array2[0][2]:
return array1
elif array1[0][2] == array2[0][2]:
return array1 array2
else:
return array2
res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[0], y[1], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[1], a[0]))
CodePudding user response:
Can you work/output with Dataframes?
List = [(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
rdd = sc.parallelize(List)
import pyspark.sql.functions as F
df = rdd\
.toDF()\
.select('_1.*', F.col('_2').alias('value'))\
.orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))
df.show(truncate=False)
----- ---------- --- -----
|_1 |_2 |_3 |value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
|City2|2020-03-25|X2 |4 |
|City4|2020-03-26|X4 |4 |
|City4|2020-03-25|X4 |1 |
|City1|2020-03-29|X1 |1 |
----- ---------- --- -----
CodePudding user response:
You can transform your rdd to a dataframe and then use a Spark's window to get max value for each city, filter rows using this value and finally order your dataframe as you wish:
from pyspark.sql import functions as F
from pyspark.sql import Window
window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = rdd.toDF().select(
F.col('_1._1').alias('city'),
F.col('_1._2').alias('date'),
F.col('_1._3').alias('key'),
F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))\
.filter(F.col('value') == F.col('max_value'))\
.drop('max_value')\
.orderBy(F.desc('value'), F.asc('date'), F.asc('city'))
And you get the following dataframe with your input rdd:
----- ---------- --- -----
|city |date |key|value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
----- ---------- --- -----
If you need a RDD at the end of process, you can retrieve it using .rdd
method:
df.rdd