Home > Blockchain >  How to mark overlapping time range in PySpark dataframe?
How to mark overlapping time range in PySpark dataframe?

Time:11-03

I want to mark rows where start and end time overlaps based on keys. For example, if given a dataframe like:

 --- ------------------- ------------------- 
|key|start_date         |end_date           |
 --- ------------------- ------------------- 
|A  |2022-01-11 00:00:00|8888-12-31 00:00:00|
|B  |2020-01-01 00:00:00|2022-02-10 00:00:00|
|B  |2019-02-08 00:00:00|2020-02-15 00:00:00|
|B  |2022-02-16 00:00:00|2022-12-15 00:00:00|
|C  |2018-01-01 00:00:00|2122-02-10 00:00:00|
 --- ------------------- ------------------- 

the resulting dataframe would have the first and second B records flagged, since their start and end times overlap. Like this:

 --- ------------------- ------------------- ----- 
|key|start_date         |end_date           |valid|
 --- ------------------- ------------------- ----- 
|A  |2022-01-11 00:00:00|8888-12-31 00:00:00|true |
|B  |2020-01-01 00:00:00|2022-02-10 00:00:00|false|
|B  |2019-02-08 00:00:00|2020-02-15 00:00:00|false|
|B  |2022-02-16 00:00:00|2022-12-15 00:00:00|true |
|C  |2018-01-01 00:00:00|2122-02-10 00:00:00|true |
 --- ------------------- ------------------- ----- 

CodePudding user response:

Here I've added scripts to combine overlapping date ranges. In your case, I had modified the last script slightly - instead of final groupBy for overlapping ranges, I have added a window function which just flags them.

Test input:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('A', '2022-01-11 00:00:00', '8888-12-31 00:00:00'),
     ('B', '2020-01-01 00:00:00', '2022-02-10 00:00:00'),
     ('B', '2019-02-08 00:00:00', '2020-02-15 00:00:00'),
     ('B', '2022-02-16 00:00:00', '2022-12-15 00:00:00'),
     ('C', '2018-01-01 00:00:00', '2122-02-10 00:00:00')],
    ['key', 'start_date', 'end_date'])

Script:

w1 = W.partitionBy("key").orderBy("start_date")
w2 = W.partitionBy("key", "contiguous_grp")
max_end = F.max("end_date").over(w1)
contiguous = F.when(F.datediff(F.lag(max_end).over(w1), "start_date") < 0, 1).otherwise(0)
df = (df
    .withColumn("contiguous_grp", F.sum(contiguous).over(w1))
    .withColumn("valid", (F.count(F.lit(1)).over(w2)) == 1)
    .drop("contiguous_grp")
)
df.show()
#  --- ------------------- ------------------- ----- 
# |key|         start_date|           end_date|valid|
#  --- ------------------- ------------------- ----- 
# |  A|2022-01-11 00:00:00|8888-12-31 00:00:00| true|
# |  B|2019-02-08 00:00:00|2020-02-15 00:00:00|false|
# |  B|2020-01-01 00:00:00|2022-02-10 00:00:00|false|
# |  B|2022-02-16 00:00:00|2022-12-15 00:00:00| true|
# |  C|2018-01-01 00:00:00|2122-02-10 00:00:00| true|
#  --- ------------------- ------------------- ----- 
  • Related