I am trying to build a timeline and I want to be able to detect timeline discontinuations. I have this test df:
ID | date |
---|---|
1 | 2012-12-01 |
1 | 2012-12-02 |
1 | 2012-12-03 |
1 | 2012-12-05 |
1 | 2012-12-06 |
1 | 2012-12-07 |
1 | 2012-12-10 |
1 | 2012-12-11 |
And I would like to get a timeline with start-end dates likes this:
ID | date | end |
---|---|---|
1 | 2012-12-01 | 2012-12-03 |
1 | 2012-12-05 | 2012-12-07 |
1 | 2012-12-10 | 2012-12-11 |
I've been trying with:
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy(F.col('date'))
df2 = (df1.withColumn("group_date", F.when( ~(F.date_add(F.col('snapshot_date'), -1) == F.lag(F.col("snapshot_date"), 1, 0).over(w1)), F.lit(1)).otherwise(F.lit(0))).filter(F.col('group_date')>1)
But not sure how to get the correct end date
CodePudding user response:
This is a case of sessionization, you can learn more about sessionization with spark with this article.
And if we adapt the solution with window in the article cited above to your specific case, we get the following code:
from pyspark.sql import functions as F
from pyspark.sql import Window
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy('snapshot_date')
df2 = dftest \
.withColumn('session_change', F.when(F.datediff(F.col('snapshot_date'), F.lag('snapshot_date').over(w1)) > 1, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('session_id', F.sum('session_change').over(w1)) \
.groupBy('ID', 'session_id') \
.agg(F.min('snapshot_date').alias('date'), F.max('snapshot_date').alias('end')) \
.drop('session_id')
That will give us the following df2
:
--- ---------- ----------
|ID |date |end |
--- ---------- ----------
|1 |2012-12-01|2012-12-03|
|1 |2012-12-05|2012-12-07|
|1 |2012-12-10|2012-12-11|
--- ---------- ----------