I have a huge dataframe that looks similar to this:
---- ------- ------- -----
|name|level_A|level_B|hours|
---- ------- ------- -----
| Bob| 10| 3| 5|
| Bob| 10| 3| 15|
| Bob| 20| 3| 25|
| Sue| 30| 3| 35|
| Sue| 30| 7| 45|
---- ------- ------- -----
My desired output:
---- -------------------- ------------------
|name| map_level_A| map_level_B|
---- -------------------- ------------------
| Bob|{10 -> 20, 20 -> 25}| {3 -> 45}|
| Sue| {30 -> 80}|{7 -> 45, 3 -> 35}|
---- -------------------- ------------------
Meaning, group by name
, adding 2 MapType columns that map level_A
and level_B
to the sum of hours
.
I know I can get that output using an UDF or a join operation.
However, in practice, the data is very big, and it's not 2 map columns, but rather tens of them, so join/UDF are just too costly.
Is there a more efficient way to do that?
CodePudding user response:
You could consider using Window functions. You'll need a windowspec for each level_X
partitioned by both name
and level_X
to calculate the sum of hours
. Then group by name
and create map from array of structs:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([("Bob", 10, 3, 5), ("Bob", 10, 3, 15), ("Bob", 20, 3, 25),
("Sue", 30, 3, 35),("Sue", 30, 7, 45), ],
["name", "level_A", "level_B", "hours"])
wla = Window.partitionBy("name", "level_A")
wlb = Window.partitionBy("name", "level_B")
result = df.withColumn("hours_A", F.sum("hours").over(wla)) \
.withColumn("hours_B", F.sum("hours").over(wlb)) \
.groupBy("name") \
.agg(
F.map_from_entries(
F.collect_set(F.struct(F.col("level_A"), F.col("hours_A")))
).alias("map_level_A"),
F.map_from_entries(
F.collect_set(F.struct(F.col("level_B"), F.col("hours_B")))
).alias("map_level_B")
)
result.show()
# ---- -------------------- ------------------
#|name| map_level_A| map_level_B|
# ---- -------------------- ------------------
#| Sue| {30 -> 80}|{3 -> 35, 7 -> 45}|
#| Bob|{10 -> 20, 20 -> 25}| {3 -> 45}|
# ---- -------------------- ------------------