Home > Net >  PySpark Grouping and Aggregating based on A Different Column?
PySpark Grouping and Aggregating based on A Different Column?

Time:04-23

I'm working on a problem where I have a dataset in the following format (replaced real data for example purposes):

session activity timestamp
1 enter_store 2022-03-01 23:25:11
1 pay_at_cashier 2022-03-01 23:31:10
1 exit_store 2022-03-01 23:55:01
2 enter_store 2022-03-02 07:15:00
2 pay_at_cashier 2022-03-02 07:24:00
2 exit_store 2022-03-02 07:35:55
3 enter_store 2022-03-05 11:07:01
3 exit_store 2022-03-05 11:22:51

I would like to be able to compute counting statistics for these events based on the pattern observed within each session. For example, based on the table above, the count of each pattern observed would be as follows:

{
    'enter_store -> pay_at_cashier -> exit_store': 2, 
    'enter_store -> exit_store': 1
}

I'm trying to do this in PySpark, but I'm having some trouble figuring out the most efficient way to do this kind of pattern matching where some steps are missing. The real problem involves a much larger dataset of ~15M events like this.

I've tried logic in the form of filtering the entire DF for unique sessions where 'enter_store' is observed, and then filtering that DF for unique sessions where 'pay_at_cashier' is observed. That works fine, the only issue is I'm having trouble thinking of ways where I can count the sessions like 3 where there is only a starting step and final step, but no middle step.

Obviously one way to do this brute-force would be to iterate over each session and assign it a pattern and increment a counter, but I'm looking for more efficient and scalable ways to do this.

Would appreciate any suggestions or insights.

CodePudding user response:

For Spark 2.4 , you could do

df = (df
      .withColumn("flow", F.expr("sort_array(collect_list(struct(timestamp, activity)) over (partition by session))"))
      .withColumn("flow", F.expr("concat_ws(' -> ', transform(flow, v -> v.activity))"))
      .groupBy("flow").agg(F.countDistinct("session").alias("total_session"))
      )
df.show(truncate=False)

#  ------------------------------------------- ------------- 
# |flow                                       |total_session|
#  ------------------------------------------- ------------- 
# |enter_store -> pay_at_cashier -> exit_store|2            |
# |enter_store -> exit_store                  |1            |
#  ------------------------------------------- ------------- 

The first block was collecting list of timestamp and its activity for each session in an ordered array (be sure timestamp is timestamp format) based on its timestamp value. After that, use only the activity values from the array using transform function (and combine them to create a string using concat_ws if needed) and group them by the activity order to get the distinct sessions.

  • Related