Home > front end >  Determine if dates are continuous in a list
Determine if dates are continuous in a list

Time:11-09

I have a dataframe containing the id of some person and the date on which he performed a certain action:

 ---- ---------- 
|  id|      date|
 ---- ---------- 
|   1|2022-09-01|
|   1|2022-10-01|
|   1|2022-11-01|
|   2|2022-07-01|
|   2|2022-10-01|
|   2|2022-11-01|
|   3|2022-09-01|
|   3|2022-10-01|
|   3|2022-11-01|
 ---- ---------- 

I need to determine the fact that this person performed some action over a certain period of time (suppose the last 3 months). In a specific example, person number 2 missed months 08 and 09, respectively, the condition was not met. So I expect to get the following result:

 ---- ------------------------------------ ------ 
|  id|                               dates|3month|
 ---- ------------------------------------ ------ 
|   1|[2022-09-01, 2022-10-01, 2022-11-01]|  true|
|   2|[2022-07-01, 2022-10-01, 2022-11-01]| false|
|   3|[2022-09-01, 2022-10-01, 2022-11-01]|  true|
 ---- ------------------------------------ ------ 

I understand that I should group by person ID and get an array of dates that correspond to it.

data.groupBy(col("id")).agg(collect_list("date") as "dates").withColumn("3month", ???)

However, I'm at a loss in writing a function that would carry out a check for compliance with the requirement.I have an option using recursion, but it does not suit me due to low performance (there may be more than one thousand dates). I would be very grateful if someone could help me with my problem.

CodePudding user response:

A simple trick is to use a set instead of a list in your aggregation, in order to have distinct values, and then check the size of that set. Here are some possible solutions:

Solution 1

Assuming you have a list of months of interest on which you want to check, you can perform a preliminary filter on the required months, then aggregate and validate.

import org.apache.spark.sql.{functions => F}
import java.time.{LocalDate, Duration}

val requiredMonths = Seq(
    LocalDate.parse("2022-09-01"),
    LocalDate.parse("2022-10-01"),
    LocalDate.parse("2022-11-01")
);

df
    .filter(F.date_trunc("month", $"date").isInCollection(requiredMonths))
    .groupBy($"id")
    .agg(F.collect_set(F.date_trunc("month", $"date")) as "months")
    .withColumn("is_valid", F.size($"months") === requiredMonths.size)

date_trunc is used to truncate the date column to month.

Solution 2

Similar to the previous one, with preliminary filter, but here assuming you have a range of months

import java.time.temporal.ChronoUnit

val firstMonth = LocalDate.parse("2022-09-01");
val lastMonth = LocalDate.parse("2022-11-01");

val requiredNumberOfMonths = ChronoUnit.MONTHS.between(firstMonth, lastMonth)   1;

df
  .withColumn("month", F.date_trunc("month", $"date"))
  .filter($"month" >= firstMonth && $"month" <= lastMonth)
  .groupBy($"id")
  .agg(F.collect_set($"month") as "months")
  .withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)
        

Solution 3

Both solution 1 and 2 have a problem that causes the complete exclusion from the final result of the ids that have no intersection with the dates of interest. This is caused by the filter applied before grouping.

Here is a solution based on solution 2 that does not filter and solves this problem.

df
   .withColumn("month", F.date_trunc("month", $"date"))
   .groupBy($"id")
   .agg(F.collect_set(F.when($"month" >= firstMonth && $"month" <= lastMonth, $"month")) as "months")
   .withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)

Now the filter is performed using a conditional collect_set.

It is right to consider also solution 1 and 2 because the preliminary filter can have advantages and in some cases that could be the expected result.

  • Related