Home > Net >  Tricky pyspark value sorting
Tricky pyspark value sorting

Time:03-23

I want to do sorting inside rdd(example given below) using pyaspark. Input rdd:

[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]

Output rdd:

[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]

Twice sorting is happening.

I am not very sure about how to do the above problem using pyspark. I have tried the below code but it is not correct I guess(because when data is huge then sorting all is not a good idea I believe). Please help.

def sort_values(x):
    return (x[0], dict(sorted(x[1].items())))    
rdd1 = input_rdd.map(lambda x: sort_values(x))

CodePudding user response:

Here is the example of using the mapPartitions function that you can use on rdd. mapPartitions apply a function to each partition of the rdd.

Below code will help to achieve the partition level sorting i.e inside dictionary will be sorted -

rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)

def sortedpartition(iterator):
    sorted_rdd_partition=[]
    for item in iterator:
        word=item[0]
        values = item[1]
        orderDict={}
        for key in sorted(values.keys()):
            orderDict[key]=sorted(values[key])
        sorted_rdd_partition.append((word,orderDict))
    return sorted_rdd_partition

rdd.mapPartitions(sortedpartition).collect()

The final results look like below

Output - 
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
 ('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
 ('a', {2: [4, 16, 456], 7: [343, 3262]}),
 ('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]

If you need rdd level sorting on then use below line of code

rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or 
rdd.mapPartitions(sortedpartition).sortByKey().collect()

output -

[('a', {2: [4, 16, 456], 7: [343, 3262]}),
 ('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
 ('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
 ('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]

CodePudding user response:

Here's something you can do

pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})

    val
0   (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1   (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2   (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})

We can't use Map in the output, so need convert to tuple() internally after sort

df=spark.createDataFrame(pdf)

def sort_dict_f(x):
    sorted_array = []
    for key in sorted(x[1].keys(), reverse=False): #sort the key
        sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
    return (x[0], sorted_array)

schema = StructType([
    StructField("word", StringType(), False),
    StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False) 
])


SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)

The Final result looks like this:

 ------------------------------------------------------------------ ------------------------------------------------------------------ 
|val                                                               |sorted                                                            |
 ------------------------------------------------------------------ ------------------------------------------------------------------ 
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]]     |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]]     |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]]           |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]]           |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
 ------------------------------------------------------------------ ------------------------------------------------------------------ 

  • Related