I need help for this case to fill, with a new row, missing values:
This is just an example, but I have a lot of rows with different IDs
.
Input dataframe:
ID | FLAG | DATE |
---|---|---|
123 | 1 | 01/01/2021 |
123 | 0 | 01/02/2021 |
123 | 1 | 01/03/2021 |
123 | 0 | 01/06/2021 |
123 | 0 | 01/08/2021 |
777 | 0 | 01/01/2021 |
777 | 1 | 01/03/2021 |
So I have a finite set of dates
and I wanna take until the last one for each ID
(in the example, for ID = 123
: 01/01/2021, 01/02/2021, 01/03/2021... until 01/08/2021). So basically I could do a cross join with a calendar, but I don't know how can I fill missing value with a rule or a filter, after the cross join.
Expected output: (in bold the generated missing values)
ID | FLAG | DATE |
---|---|---|
123 | 1 | 01/01/2021 |
123 | 0 | 01/02/2021 |
123 | 1 | 01/03/2021 |
123 | 1 | 01/04/2021 |
123 | 1 | 01/05/2021 |
123 | 0 | 01/06/2021 |
123 | 0 | 01/07/2021 |
123 | 0 | 01/08/2021 |
777 | 0 | 01/01/2021 |
777 | 0 | 01/02/2021 |
777 | 1 | 01/03/2021 |
CodePudding user response:
You can first group by id
to calculate max and min date
then using sequence
function, generate all the dates from min_date
to max_date
. Finally, join with original dataframe and fill nulls with last non null per group of id
. Here's a complete working example:
Your input dataframe:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([
(123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
(123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
(123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
(777, 1, "01/03/2021")
], ["id", "flag", "date"])
Groupby id
and generate all possible dates for each id
:
all_dates_df = df.groupBy("id").agg(
F.date_trunc("month", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
F.date_trunc("month", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
"id",
F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
"date", F.explode("date")
).withColumn(
"date",
F.date_format("date", "dd/MM/yyyy")
)
Now, left join with df
and use last
function over a Window partitioned by id
to fill null flag
values:
result = all_dates_df.join(df, ["id", "date"], "left").withColumn(
"flag",
F.last(F.col("flag"), ignorenulls=True).over(Window.partitionBy("id").orderBy("date"))
)
result.show()
# --- ---------- ----
#| id| date|flag|
# --- ---------- ----
#|123|01/01/2021| 1|
#|123|01/02/2021| 0|
#|123|01/03/2021| 1|
#|123|01/04/2021| 1|
#|123|01/05/2021| 1|
#|123|01/06/2021| 0|
#|123|01/07/2021| 0|
#|123|01/08/2021| 0|
#|777|01/01/2021| 0|
#|777|01/02/2021| 0|
#|777|01/03/2021| 1|
# --- ---------- ----
CodePudding user response:
You can find the ranges of dates between the DATE
value in the current row and the following row and then use sequence
to generate all intermediate dates and explode this array to fill in values for the missing dates.
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(123, 1, "01/01/2021",),
(123, 0, "01/02/2021",),
(123, 1, "01/03/2021",),
(123, 0, "01/06/2021",),
(123, 0, "01/08/2021",),
(777, 0, "01/01/2021",),
(777, 1, "01/03/2021",), ]
df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))
window_spec = Window.partitionBy("ID").orderBy("DATE")
next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.add_months(F.col("DATE"), 1))
next_date_range = F.add_months(next_date, -1)
df.withColumn("Ranges", F.sequence(F.col("DATE"), next_date_range, F.expr("interval 1 month")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
.drop("Ranges").show(truncate=False)
Output
--- ---- ----------
|ID |FLAG|DATE |
--- ---- ----------
|123|1 |01/01/2021|
|123|0 |01/02/2021|
|123|1 |01/03/2021|
|123|1 |01/04/2021|
|123|1 |01/05/2021|
|123|0 |01/06/2021|
|123|0 |01/07/2021|
|123|0 |01/08/2021|
|777|0 |01/01/2021|
|777|0 |01/02/2021|
|777|1 |01/03/2021|
--- ---- ----------