Home > Blockchain >  PySpark Split Dataframe into Sliding Windows with intervals
PySpark Split Dataframe into Sliding Windows with intervals

Time:12-17

I have the following dataframe as input

 ---- ------------------- 
| id |               date|
 ---- ------------------- 
|   A|2016-03-11 09:00:00|
|   B|2016-03-11 09:00:07|
|   C|2016-03-11 09:00:18|
|   D|2016-03-11 09:00:21|
|   E|2016-03-11 09:00:39|
|   F|2016-03-11 09:00:44|
|   G|2016-03-11 09:00:49|
 ---- ------------------- 

and I would like to partition it into 10 seconds window that ignore larger intervals of time than the window itself (basically as if restarting the counter).
Here's an example of the expected output:

 ---- ------------------- ----- 
| id |               date|group|
 ---- ------------------- ----- 
|   A|2016-03-11 09:00:00|  1  |
|   B|2016-03-11 09:00:07|  1  |
|   C|2016-03-11 09:00:18|  2  |
|   D|2016-03-11 09:00:21|  2  |
|   E|2016-03-11 09:00:39|  3  |
|   F|2016-03-11 09:00:44|  3  |
|   G|2016-03-11 09:00:49|  4  |
 ---- ------------------- ----- 

As opposed to a fixed timeslicing which would group them like so:

 ---- ------------------- ----- 
| id |               date|group|
 ---- ------------------- ----- 
|   A|2016-03-11 09:00:00|  1  |
|   B|2016-03-11 09:00:07|  1  |
|   C|2016-03-11 09:00:18|  2  |
|   D|2016-03-11 09:00:21|  3  |
|   E|2016-03-11 09:00:39|  4  |
|   F|2016-03-11 09:00:44|  5  |
|   G|2016-03-11 09:00:49|  5  |
 ---- ------------------- ----- 

I tried looking for solutions with the functions rowsBetween and rangeBetween, or with some variations of time difference, but I couldn't find the proper solution.

It's probaly due to the fact that I don't know the proper terminology for this kind of windowing (it's neither tumbling nor rolling).

The closest I got was using the window function, but there were two problems:

  • It performs a fixed timeslicing, which is not what I'm looking for
  • I'm not sure how to assign the ID or how to aggregate it correctly

Here's the code I tried (which used the original df with an additional 'val' column with random values):

w = df.groupBy(window("date", "10 seconds")).agg(sum("val").alias("sum"))
w.select(w.window.start.cast("string").alias("start"),
         w.window.end.cast("string").alias("end"), "sum").collect()

Any help would be greatly appreciated.

CodePudding user response:

You can do it, but it is expensive to do so due to needing multiple window functions, some without partitioning (you lose the benefit of spark when doing so). To achieve it regardless:

  1. group the rows which are <10s between each other. (group1)
  2. within those groups you group the rows based on how many 10s of seconds they are from the lowest date in that group. (group2)
  3. take a dense_rank over those two groups you will get your final grouping. (group3)
  4. Tweak to your liking

I've left in the intermediate columns for clarity.

from pyspark.sql.functions import col, lag, sum, when, min, floor, dense_rank


data = [
('B', '2016-03-11 09:00:07'),
('C', '2016-03-11 09:00:18'),
('A', '2016-03-11 09:00:00'),
('D', '2016-03-11 09:00:21'),
('E', '2016-03-11 09:00:39'),
('F', '2016-03-11 09:00:44'),
('G', '2016-03-11 09:00:49')
]

df = spark.createDataFrame(data, ['id', 'date'])
df = df.withColumn('date', df['date'].astype('timestamp'))

w = Window.orderBy('date')
w2 = Window.partitionBy('group1').orderBy('date')
w3 = Window.orderBy('group1', 'group2')

df = (df
      .withColumn('diff', col('date').astype('long') - lag(col('date').astype('long')).over(w))
      .withColumn('diff2', when(col('diff').isNull(), 0).otherwise(col('diff')))
      .withColumn('diff3', (col('diff2') > 10).astype('int'))
      .withColumn('group1', sum('diff3').over(w))
      .withColumn('group2', floor((col('date').astype('long') - min(col('date').astype('long')).over(w2)) / 10))
      .withColumn('group3', dense_rank().over(w3))
)

# output
 --- ------------------- ---- ----- ----- ------ ------ ------ 
|id |date               |diff|diff2|diff3|group1|group2|group3|
 --- ------------------- ---- ----- ----- ------ ------ ------ 
|A  |2016-03-11 09:00:00|null|0    |0    |0     |0     |1     |
|B  |2016-03-11 09:00:07|7   |7    |0    |0     |0     |1     |
|C  |2016-03-11 09:00:18|11  |11   |1    |1     |0     |2     |
|D  |2016-03-11 09:00:21|3   |3    |0    |1     |0     |2     |
|E  |2016-03-11 09:00:39|18  |18   |1    |2     |0     |3     |
|F  |2016-03-11 09:00:44|5   |5    |0    |2     |0     |3     |
|G  |2016-03-11 09:00:49|5   |5    |0    |2     |1     |4     |
 --- ------------------- ---- ----- ----- ------ ------ ------ 
  • Related