Home > Software engineering >  pyspark groupby and create column containing a dictionary of the others columns
pyspark groupby and create column containing a dictionary of the others columns

Time:10-24

I have this pyspark dataframe

df = spark.createDataFrame([("a", "b", "v1", 1234, 56, 78, 9), ("a", "b", "v2", 987, 6, 543, 21), ("c", "d", "v1", 12, 345, 6, 789), ("c", "d", "v2", 9, 876, 5, 4321)], ("k1", "k2", "k3", "ca", "pa", "cb", "pb"))
df.show()

 --- --- --- ---- --- --- ---- 
| k1| k2| k3|  ca| pa| cb|  pb|
 --- --- --- ---- --- --- ---- 
|  a|  b| v1|1234| 56| 78|   9|
|  a|  b| v2| 987|  6|543|  21|
|  c|  d| v1|  12|345|  6| 789|
|  c|  d| v2|   9|876|  5|4321|
 --- --- --- ---- --- --- ---- 

Basically what I want to do is transform this dataframe by grouping on the first two keys k1 and k2 and use the third key k3 as the main key for a dictionary that would the values of the other columns (ca, pa, cb, pb) and that would be contained in a new column. This transformation would result in a dataframe looking exactly like this:

 --- --- -------------------------------------------------------------------------------------------------- 
|k1 |k2 |k3                                                                                                |
 --- --- -------------------------------------------------------------------------------------------------- 
|c  |d  |{"v1": {"pa": 345, "pb": 789, "ca": 12, "cb": 6}, "v2": {"pa": 876, "pb": 4321, "ca": 9, "cb": 5}}|
|a  |b  |{"v1": {"pa": 56, "pb": 9, "ca": 1234, "cb": 78}, "v2": {"pa": 6, "pb": 21, "ca": 987, "cb": 543}}|
 --- --- -------------------------------------------------------------------------------------------------- 

To do so I wrote the following code but I think this code could be improved (using pandas_udf or something else) but I did not manage to find a better solution and I'm looking for any advice/guidance that could lead to a more elegant an efficient solution.

def reoganize_col(list_json):
    col_data = {}
    for d in list_json:
        print(d)
        for k,v in d.items(): 
            col_data[k] = v
    return json.dumps(col_data)
udf_reoganize_col = F.udf(reoganize_col, T.StringType())

df = df.withColumn('x', F.create_map(F.lit('ca'), F.col('ca'),
                                     F.lit('cb'), F.col('cb'),
                                     F.lit('pa'), F.col('pa'),
                                     F.lit('pb'), F.col('pb')))
     .groupby(['k1', 'k2']).agg(F.collect_list(F.create_map(F.col('k3'), F.col('x'))).alias('k3'))
df = df.withColumn('k3', udf_reoganize_col(F.col('k3')))

CodePudding user response:

You're almost there in your solution. I'd suggest you to use to_json instead of UDF to improve performance, and use struct instead of map to make the code cleaner.

(df
    .groupBy('k1', 'k2')
    .agg(F.collect_list(F.struct('k3', F.struct('pa', 'pb', 'ca', 'cb'))).alias('k3'))
    .withColumn('k3', F.to_json(F.map_from_entries('k3')))
    .show(10, False)
)

# Output
#  --- --- --------------------------------------------------------------------------------- 
# |k1 |k2 |k3                                                                               |
#  --- --- --------------------------------------------------------------------------------- 
# |c  |d  |{"v1":{"pa":345,"pb":789,"ca":12,"cb":6},"v2":{"pa":876,"pb":4321,"ca":9,"cb":5}}|
# |a  |b  |{"v1":{"pa":56,"pb":9,"ca":1234,"cb":78},"v2":{"pa":6,"pb":21,"ca":987,"cb":543}}|
#  --- --- --------------------------------------------------------------------------------- 
  • Related