I have a dataframe that looks like below
----------- -------------- ---------------- -------------------- -----------------------
|CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|mkt_channel_subcategory|
----------- -------------- ---------------- -------------------- -----------------------
| 1794405| 8| email| e_cat| send|
| 19911215| 9| email| e_cat| delivery|
| 18907679| 10| email| e_cat| open|
| 18106624| 11| email| e_cat| click|
| 8335735| 8| email| e_cat| send|
| 17912034| 11| email| e_cat| click|
I need to create 3 columns like total emails sent
(send delivery), open
, click
for each customer
I am using the below code and it is creating only single column
df = d.groupfBy('CUSTOMER_ID','mkt_channel_id').agg(F.count('mkt_channel_subcategory'))
my final table should have the below columns
CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|mkt_channel_subcategory|sent|open|click
Can anyone tell me how to do this?
CodePudding user response:
It seems you want to pivot the column mkt_channel_subcategory
.
You can first merge the event values send
and delivery
then pivot the column and count.
Something like this applied to your input example:
from pyspark.sql import functions as F
df = df.withColumn(
"mkt_channel_subcategory",
F.when(
F.col("mkt_channel_subcategory") == "delivery",
F.lit("send")
).otherwise(F.col("mkt_channel_subcategory"))
).groupby(
"CUSTOMER_ID", "mkt_channel_id", "mkt_channel_name", "mkt_channel_category"
).pivot(
"mkt_channel_subcategory"
).agg(F.count("*")).fillna(0)
df.show()
# ----------- -------------- ---------------- -------------------- ----- ---- ----
# |CUSTOMER_ID|mkt_channel_id|mkt_channel_name|mkt_channel_category|click|open|send|
# ----------- -------------- ---------------- -------------------- ----- ---- ----
# | 18907679| 10| email| e_cat| 0| 1| 0|
# | 1794405| 8| email| e_cat| 0| 0| 1|
# | 17912034| 11| email| e_cat| 1| 0| 0|
# | 19911215| 9| email| e_cat| 0| 0| 1|
# | 8335735| 8| email| e_cat| 0| 0| 1|
# | 18106624| 11| email| e_cat| 1| 0| 0|
# ----------- -------------- ---------------- -------------------- ----- ---- ----
and the same could be achieved using a conditional aggregation:
df = df.groupby(
"CUSTOMER_ID", "mkt_channel_id", "mkt_channel_name", "mkt_channel_category"
).agg(
F.sum(F.when(F.col("mkt_channel_subcategory").isin("delivery", "send"), 1).otherwise(0)).alias("sent")
# ... same for other columns
)