Home > database >  aggregate pyspark dataframe and create multiple columns
aggregate pyspark dataframe and create multiple columns

Time:06-29

I have a dataframe that looks like below

 ----------- -------------- ---------------- -------------------- ----------------------- 
|CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|mkt_channel_subcategory|
 ----------- -------------- ---------------- -------------------- ----------------------- 
|    1794405|             8|           email|               e_cat|                   send|
|   19911215|             9|           email|               e_cat|               delivery|
|   18907679|            10|           email|               e_cat|                   open|
|   18106624|            11|           email|               e_cat|                  click|
|    8335735|             8|           email|               e_cat|                   send|
|   17912034|            11|           email|               e_cat|                  click|

I need to create 3 columns like total emails sent(send delivery), open, click for each customer

I am using the below code and it is creating only single column

df = d.groupfBy('CUSTOMER_ID','mkt_channel_id').agg(F.count('mkt_channel_subcategory'))

my final table should have the below columns

CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|mkt_channel_subcategory|sent|open|click

Can anyone tell me how to do this?

CodePudding user response:

It seems you want to pivot the column mkt_channel_subcategory. You can first merge the event values send and delivery then pivot the column and count.

Something like this applied to your input example:

from pyspark.sql import functions as F

df = df.withColumn(
    "mkt_channel_subcategory",
    F.when(
        F.col("mkt_channel_subcategory") == "delivery",
        F.lit("send")
    ).otherwise(F.col("mkt_channel_subcategory"))
).groupby(
    "CUSTOMER_ID", "mkt_channel_id", "mkt_channel_name", "mkt_channel_category"
).pivot(
    "mkt_channel_subcategory"
).agg(F.count("*")).fillna(0)

df.show()

#  ----------- -------------- ---------------- -------------------- ----- ---- ---- 
# |CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|click|open|send|
#  ----------- -------------- ---------------- -------------------- ----- ---- ---- 
# |   18907679|            10|           email|               e_cat|    0|   1|   0|
# |    1794405|             8|           email|               e_cat|    0|   0|   1|
# |   17912034|            11|           email|               e_cat|    1|   0|   0|
# |   19911215|             9|           email|               e_cat|    0|   0|   1|
# |    8335735|             8|           email|               e_cat|    0|   0|   1|
# |   18106624|            11|           email|               e_cat|    1|   0|   0|
#  ----------- -------------- ---------------- -------------------- ----- ---- ---- 

and the same could be achieved using a conditional aggregation:

df = df.groupby(
    "CUSTOMER_ID", "mkt_channel_id", "mkt_channel_name", "mkt_channel_category"
).agg(
    F.sum(F.when(F.col("mkt_channel_subcategory").isin("delivery", "send"), 1).otherwise(0)).alias("sent")
    # ... same for other columns
)
  • Related