Home > Enterprise >  Splitting / Grouping DataFrame based on a date / time difference
Splitting / Grouping DataFrame based on a date / time difference

Time:12-13

I have the following rows in a dataframe:

sender receiver bytes timestamp
A B 50 2147483647
C D 100 2147483648
A B 150 2147483657
C D 200 2147483658
A B 550 2147487657

Each record/row in that dataframe contains the amount of data that has been sent between a sender and receiver within a 10s time window. The timestamps marks when that individual time window started.

Now, I want to compute the amount of data between every pair of sender and receiver within a "flow". With a flow, I mean that data is continuously transferred between sender and receiver. If for a longer period of time (say 1 hour) no data is transferred, I want the flows to be split. In the example above, I would like to get:

  • flow_AB_1 = 200 bytes
  • flow_CD_1 = 300 bytes
  • flow_AB_2 = 550 bytes

flow_AB_2 would be a separate flow, as 2147487657 - 2147483657 = 4000 that is greater than 3600.

Is there a way to achieve this with pyspark/Apache Spark?

CodePudding user response:

To solve your issue you can:

  • create a new column flow using the sessionization algorithm based on Spark's window described in this blog post
  • group by flow, sender and receiver to sum bytes
  • optionally, build your flows' names by concatenating sender, receiver and flow column

The complete code would be as follows:

from pyspark.sql import functions as F
from pyspark.sql import Window

window = Window.partitionBy('sender', 'receiver').orderBy('timestamp')

result = dataframe \
    .withColumn('flow_split', F.when(F.col('timestamp') - F.lag('timestamp').over(window) > 3600, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('flow', F.sum('flow_split').over(window)) \
    .groupby('sender', 'receiver', 'flow') \
    .agg(F.sum('bytes').alias('bytes')) \
    .select(
      F.concat(F.lit('flow_'), F.col('sender'), F.col('receiver'), F.lit('_'), F.col('flow')   1).alias('flow'),
      F.col('bytes')
    )

With the input dataframe in your question, you will get the following result:

 --------- ----- 
|flow     |bytes|
 --------- ----- 
|flow_AB_1|200  |
|flow_AB_2|550  |
|flow_CD_1|300  |
 --------- ----- 
  • Related