I have a pyspark Dataframe which has two columns, ID and count, count column is a dict/Map<str,int>. The values in count are not sorted, I am trying to sort the values inside the count column and get only top 4 based on value and remove the rest other Key-Values
I have
ID count
3004000304 {'A' -> 2, 'B' -> 4, 'C -> 5, 'D' -> 1, 'E' -> 9}
3004002756 {'B' -> 3, 'A' -> 8,'D' -> 3, 'C' -> 8, 'E' -> 1}
I want something like, Only top 4 based on values are selected in the count column
ID count
3004000304 {'E' -> 9, 'C' -> 5, 'B' -> 4, 'A' -> 2}
3004002756 {'A' -> 8, 'C' -> 8, 'B' ->, 'D' -> 3}
My approach
from pyspark.sql import functions as F
def sort_dict_f(x):
sorted_x = sorted(x.items(), key=operator.itemgetter(1))
return sorted_x
SorterUDF = F.udf(sort_dict_f)
df = old_df.withColumn('Sorted', SorterUDF("count"))
df.show()
If it is not possible in Pyspark, is it possible to convert to pandas df and then sort top 4 by value? Any help is much appreciated
CodePudding user response:
You can first explode
the map, get the 4 rows with the highest count per ID, and then reconstruct it as a map.
df = df.select('id', F.explode('count')) \
.withColumn('rn', F.expr('row_number() over (partition by id order by value desc)')) \
.filter('rn <= 4') \
.groupBy('id') \
.agg(F.map_from_entries(F.collect_list(F.struct('key', 'value'))))
df.show(truncate=False)