If I have table with dates in format MM/DD/YYYY like below.
----- --- ----- ----- -----
| id|startdate |enddate |
----- ------ ----- --------
| 1| 01/01/2022|01/31/2022|
| 1| 02/01/2022|02/28/2022|
| 1| 03/01/2022|03/31/2022|
| 2| 01/01/2022|03/01/2022|
| 2| 03/05/2022|03/31/2022|
| 2| 04/01/2022|04/05/2022|
----- --- ----- -----
How to I group based on id column and if start and end date is continuous?
One thing is if there is more than a one day gap then keep the row on a new line so the above table will become:
----- --- ----- ----- -----
| id|startdate |enddate |
----- ------ ----- --------
| 1| 01/01/2022|31/03/2022|
| 2| 01/01/2022|03/01/2022|
| 2| 03/05/2022|04/05/2022|
----- --- ----- ----- -----
id = 1 becomes one row as all dates for id =1 is continuous i.e no gap > 1 but id 2 has two rows as there is a gap between 03/01/2022 and 03/05/2022.
CodePudding user response:
This is a particular case of the sessionization problem (i.e. identify sessions in data based on some conditions).
Here is a possible solution that uses windows. The logic behind the solution:
- Associate at each row the temporally previous
enddate
with the sameid
- Calculate the difference in days between each
startdate
and the previousenddate
- Identify all the rows that don't have a previous row or is at least two days after the previous row
- Associate at each row a
session_index
, that is the number of new sessions seen up to this line - Aggregate grouping by
id
andsession_index
w = Window.partitionBy("id")\
.orderBy("startdate")
df = df \
.select(
F.col("id"),
F.to_date("startdate", "MM/dd/yyyy").alias("startdate"),
F.to_date("enddate", "MM/dd/yyyy").alias("enddate")
) \
.withColumn("previous_enddate", F.lag('enddate', offset=1).over(w)) \
.withColumn("date_diff", F.datediff(F.col("startdate"), F.col("previous_enddate"))) \
.withColumn("is_new_session", F.col("date_diff").isNull() | (F.col("date_diff") > 1)) \
.withColumn("session_index", F.sum(F.col("is_new_session").cast("int")).over(w))
df.groupBy("id", "session_index") \
.agg(
F.min("startdate").alias("startdate"),
F.max("enddate").alias("enddate")
) \
.drop("session_index")