Home > OS >  Spark Window Function to build timeline
Spark Window Function to build timeline

Time:12-10

I am trying to build a timeline and I want to be able to detect timeline discontinuations. I have this test df:

ID date
1 2012-12-01
1 2012-12-02
1 2012-12-03
1 2012-12-05
1 2012-12-06
1 2012-12-07
1 2012-12-10
1 2012-12-11

And I would like to get a timeline with start-end dates likes this:

ID date end
1 2012-12-01 2012-12-03
1 2012-12-05 2012-12-07
1 2012-12-10 2012-12-11

I've been trying with:

columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'), 
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]

dftest = spark.createDataFrame(data).toDF(*columns)

w1 = Window.partitionBy('id').orderBy(F.col('date'))

df2 = (df1.withColumn("group_date", F.when( ~(F.date_add(F.col('snapshot_date'), -1) == F.lag(F.col("snapshot_date"), 1, 0).over(w1)), F.lit(1)).otherwise(F.lit(0))).filter(F.col('group_date')>1)               
 

But not sure how to get the correct end date

CodePudding user response:

This is a case of sessionization, you can learn more about sessionization with spark with this article.

And if we adapt the solution with window in the article cited above to your specific case, we get the following code:

from pyspark.sql import functions as F
from pyspark.sql import Window

columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'), 
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]

dftest = spark.createDataFrame(data).toDF(*columns)

w1 = Window.partitionBy('id').orderBy('snapshot_date')

df2 = dftest \
  .withColumn('session_change', F.when(F.datediff(F.col('snapshot_date'), F.lag('snapshot_date').over(w1)) > 1, F.lit(1)).otherwise(F.lit(0))) \
  .withColumn('session_id', F.sum('session_change').over(w1)) \
  .groupBy('ID', 'session_id') \
  .agg(F.min('snapshot_date').alias('date'), F.max('snapshot_date').alias('end')) \
  .drop('session_id')

That will give us the following df2:

 --- ---------- ---------- 
|ID |date      |end       |
 --- ---------- ---------- 
|1  |2012-12-01|2012-12-03|
|1  |2012-12-05|2012-12-07|
|1  |2012-12-10|2012-12-11|
 --- ---------- ---------- 
  • Related