I have the following rows in a dataframe:
sender | receiver | bytes | timestamp |
---|---|---|---|
A | B | 50 | 2147483647 |
C | D | 100 | 2147483648 |
A | B | 150 | 2147483657 |
C | D | 200 | 2147483658 |
A | B | 550 | 2147487657 |
Each record/row in that dataframe contains the amount of data that has been sent between a sender and receiver within a 10s time window. The timestamps marks when that individual time window started.
Now, I want to compute the amount of data between every pair of sender and receiver within a "flow". With a flow, I mean that data is continuously transferred between sender and receiver. If for a longer period of time (say 1 hour) no data is transferred, I want the flows to be split. In the example above, I would like to get:
- flow_AB_1 = 200 bytes
- flow_CD_1 = 300 bytes
- flow_AB_2 = 550 bytes
flow_AB_2 would be a separate flow, as 2147487657 - 2147483657 = 4000
that is greater than 3600
.
Is there a way to achieve this with pyspark/Apache Spark?
CodePudding user response:
To solve your issue you can:
- create a new column
flow
using the sessionization algorithm based on Spark's window described in this blog post - group by
flow
,sender
andreceiver
to sumbytes
- optionally, build your flows' names by concatenating
sender
,receiver
andflow
column
The complete code would be as follows:
from pyspark.sql import functions as F
from pyspark.sql import Window
window = Window.partitionBy('sender', 'receiver').orderBy('timestamp')
result = dataframe \
.withColumn('flow_split', F.when(F.col('timestamp') - F.lag('timestamp').over(window) > 3600, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('flow', F.sum('flow_split').over(window)) \
.groupby('sender', 'receiver', 'flow') \
.agg(F.sum('bytes').alias('bytes')) \
.select(
F.concat(F.lit('flow_'), F.col('sender'), F.col('receiver'), F.lit('_'), F.col('flow') 1).alias('flow'),
F.col('bytes')
)
With the input dataframe in your question, you will get the following result:
--------- -----
|flow |bytes|
--------- -----
|flow_AB_1|200 |
|flow_AB_2|550 |
|flow_CD_1|300 |
--------- -----