I want to do sorting inside rdd(example given below) using pyaspark. Input rdd:
[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]
Output rdd:
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
Twice sorting is happening.
I am not very sure about how to do the above problem using pyspark. I have tried the below code but it is not correct I guess(because when data is huge then sorting all is not a good idea I believe). Please help.
def sort_values(x):
return (x[0], dict(sorted(x[1].items())))
rdd1 = input_rdd.map(lambda x: sort_values(x))
CodePudding user response:
Here is the example of using the mapPartitions function that you can use on rdd. mapPartitions apply a function to each partition of the rdd.
Below code will help to achieve the partition level sorting i.e inside dictionary will be sorted -
rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)
def sortedpartition(iterator):
sorted_rdd_partition=[]
for item in iterator:
word=item[0]
values = item[1]
orderDict={}
for key in sorted(values.keys()):
orderDict[key]=sorted(values[key])
sorted_rdd_partition.append((word,orderDict))
return sorted_rdd_partition
rdd.mapPartitions(sortedpartition).collect()
The final results look like below
Output -
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('a', {2: [4, 16, 456], 7: [343, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
If you need rdd level sorting on then use below line of code
rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or
rdd.mapPartitions(sortedpartition).sortByKey().collect()
output -
[('a', {2: [4, 16, 456], 7: [343, 3262]}),
('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
CodePudding user response:
Here's something you can do
pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})
val
0 (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1 (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2 (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})
We can't use Map in the output, so need convert to tuple() internally after sort
df=spark.createDataFrame(pdf)
def sort_dict_f(x):
sorted_array = []
for key in sorted(x[1].keys(), reverse=False): #sort the key
sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
return (x[0], sorted_array)
schema = StructType([
StructField("word", StringType(), False),
StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False)
])
SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)
The Final result looks like this:
------------------------------------------------------------------ ------------------------------------------------------------------
|val |sorted |
------------------------------------------------------------------ ------------------------------------------------------------------
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]] |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]] |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]] |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]] |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
------------------------------------------------------------------ ------------------------------------------------------------------