I want to mark rows where start and end time overlaps based on keys. For example, if given a dataframe like:
--- ------------------- -------------------
|key|start_date |end_date |
--- ------------------- -------------------
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|
--- ------------------- -------------------
the resulting dataframe would have the first and second B records flagged, since their start and end times overlap. Like this:
--- ------------------- ------------------- -----
|key|start_date |end_date |valid|
--- ------------------- ------------------- -----
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|true |
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|false|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|false|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|true |
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|true |
--- ------------------- ------------------- -----
CodePudding user response:
Here I've added scripts to combine overlapping date ranges. In your case, I had modified the last script slightly - instead of final groupBy
for overlapping ranges, I have added a window function which just flags them.
Test input:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('A', '2022-01-11 00:00:00', '8888-12-31 00:00:00'),
('B', '2020-01-01 00:00:00', '2022-02-10 00:00:00'),
('B', '2019-02-08 00:00:00', '2020-02-15 00:00:00'),
('B', '2022-02-16 00:00:00', '2022-12-15 00:00:00'),
('C', '2018-01-01 00:00:00', '2122-02-10 00:00:00')],
['key', 'start_date', 'end_date'])
Script:
w1 = W.partitionBy("key").orderBy("start_date")
w2 = W.partitionBy("key", "contiguous_grp")
max_end = F.max("end_date").over(w1)
contiguous = F.when(F.datediff(F.lag(max_end).over(w1), "start_date") < 0, 1).otherwise(0)
df = (df
.withColumn("contiguous_grp", F.sum(contiguous).over(w1))
.withColumn("valid", (F.count(F.lit(1)).over(w2)) == 1)
.drop("contiguous_grp")
)
df.show()
# --- ------------------- ------------------- -----
# |key| start_date| end_date|valid|
# --- ------------------- ------------------- -----
# | A|2022-01-11 00:00:00|8888-12-31 00:00:00| true|
# | B|2019-02-08 00:00:00|2020-02-15 00:00:00|false|
# | B|2020-01-01 00:00:00|2022-02-10 00:00:00|false|
# | B|2022-02-16 00:00:00|2022-12-15 00:00:00| true|
# | C|2018-01-01 00:00:00|2122-02-10 00:00:00| true|
# --- ------------------- ------------------- -----