Home > front end >  PySpark - create multiple aggregative map columns without using UDF or join
PySpark - create multiple aggregative map columns without using UDF or join

Time:01-04

I have a huge dataframe that looks similar to this:

 ---- ------- ------- ----- 
|name|level_A|level_B|hours|
 ---- ------- ------- ----- 
| Bob|     10|      3|    5|
| Bob|     10|      3|   15|
| Bob|     20|      3|   25|
| Sue|     30|      3|   35|
| Sue|     30|      7|   45|
 ---- ------- ------- ----- 

My desired output:

 ---- -------------------- ------------------ 
|name|         map_level_A|       map_level_B|
 ---- -------------------- ------------------ 
| Bob|{10 -> 20, 20 -> 25}|         {3 -> 45}|
| Sue|          {30 -> 80}|{7 -> 45, 3 -> 35}|
 ---- -------------------- ------------------ 

Meaning, group by name, adding 2 MapType columns that map level_A and level_B to the sum of hours.

I know I can get that output using an UDF or a join operation.

However, in practice, the data is very big, and it's not 2 map columns, but rather tens of them, so join/UDF are just too costly.

Is there a more efficient way to do that?

CodePudding user response:

You could consider using Window functions. You'll need a windowspec for each level_X partitioned by both name and level_X to calculate the sum of hours. Then group by name and create map from array of structs:

from pyspark.sql import Window
import pyspark.sql.functions as F

df = spark.createDataFrame([("Bob", 10, 3, 5), ("Bob", 10, 3, 15), ("Bob", 20, 3, 25), 
                            ("Sue", 30, 3, 35),("Sue", 30, 7, 45), ], 
                           ["name", "level_A", "level_B", "hours"])

wla = Window.partitionBy("name", "level_A")
wlb = Window.partitionBy("name", "level_B")

result = df.withColumn("hours_A", F.sum("hours").over(wla)) \
    .withColumn("hours_B", F.sum("hours").over(wlb)) \
    .groupBy("name") \
    .agg(
        F.map_from_entries(
            F.collect_set(F.struct(F.col("level_A"), F.col("hours_A")))
        ).alias("map_level_A"),
        F.map_from_entries(
            F.collect_set(F.struct(F.col("level_B"), F.col("hours_B")))
        ).alias("map_level_B")
    )

result.show()

# ---- -------------------- ------------------ 
#|name|         map_level_A|       map_level_B|
# ---- -------------------- ------------------ 
#| Sue|          {30 -> 80}|{3 -> 35, 7 -> 45}|
#| Bob|{10 -> 20, 20 -> 25}|         {3 -> 45}|
# ---- -------------------- ------------------ 
  •  Tags:  
  • Related