I have a dataframe containing the id of some person and the date on which he performed a certain action:
---- ----------
| id| date|
---- ----------
| 1|2022-09-01|
| 1|2022-10-01|
| 1|2022-11-01|
| 2|2022-07-01|
| 2|2022-10-01|
| 2|2022-11-01|
| 3|2022-09-01|
| 3|2022-10-01|
| 3|2022-11-01|
---- ----------
I need to determine the fact that this person performed some action over a certain period of time (suppose the last 3 months). In a specific example, person number 2 missed months 08 and 09, respectively, the condition was not met. So I expect to get the following result:
---- ------------------------------------ ------
| id| dates|3month|
---- ------------------------------------ ------
| 1|[2022-09-01, 2022-10-01, 2022-11-01]| true|
| 2|[2022-07-01, 2022-10-01, 2022-11-01]| false|
| 3|[2022-09-01, 2022-10-01, 2022-11-01]| true|
---- ------------------------------------ ------
I understand that I should group by person ID and get an array of dates that correspond to it.
data.groupBy(col("id")).agg(collect_list("date") as "dates").withColumn("3month", ???)
However, I'm at a loss in writing a function that would carry out a check for compliance with the requirement.I have an option using recursion, but it does not suit me due to low performance (there may be more than one thousand dates). I would be very grateful if someone could help me with my problem.
CodePudding user response:
A simple trick is to use a set
instead of a list
in your aggregation, in order to have distinct values, and then check the size of that set.
Here are some possible solutions:
Solution 1
Assuming you have a list of months of interest on which you want to check, you can perform a preliminary filter on the required months, then aggregate and validate.
import org.apache.spark.sql.{functions => F}
import java.time.{LocalDate, Duration}
val requiredMonths = Seq(
LocalDate.parse("2022-09-01"),
LocalDate.parse("2022-10-01"),
LocalDate.parse("2022-11-01")
);
df
.filter(F.date_trunc("month", $"date").isInCollection(requiredMonths))
.groupBy($"id")
.agg(F.collect_set(F.date_trunc("month", $"date")) as "months")
.withColumn("is_valid", F.size($"months") === requiredMonths.size)
date_trunc
is used to truncate the date
column to month.
Solution 2
Similar to the previous one, with preliminary filter, but here assuming you have a range of months
import java.time.temporal.ChronoUnit
val firstMonth = LocalDate.parse("2022-09-01");
val lastMonth = LocalDate.parse("2022-11-01");
val requiredNumberOfMonths = ChronoUnit.MONTHS.between(firstMonth, lastMonth) 1;
df
.withColumn("month", F.date_trunc("month", $"date"))
.filter($"month" >= firstMonth && $"month" <= lastMonth)
.groupBy($"id")
.agg(F.collect_set($"month") as "months")
.withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)
Solution 3
Both solution 1 and 2 have a problem that causes the complete exclusion from the final result of the id
s that have no intersection with the dates of interest.
This is caused by the filter applied before grouping.
Here is a solution based on solution 2 that does not filter and solves this problem.
df
.withColumn("month", F.date_trunc("month", $"date"))
.groupBy($"id")
.agg(F.collect_set(F.when($"month" >= firstMonth && $"month" <= lastMonth, $"month")) as "months")
.withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)
Now the filter is performed using a conditional collect_set
.
It is right to consider also solution 1 and 2 because the preliminary filter can have advantages and in some cases that could be the expected result.