Home > Mobile >  PySpark generate missing dates and fill data with previous value
PySpark generate missing dates and fill data with previous value

Time:12-24

I need help for this case to fill, with a new row, missing values:

This is just an example, but I have a lot of rows with different IDs.

Input dataframe:

ID FLAG DATE
123 1 01/01/2021
123 0 01/02/2021
123 1 01/03/2021
123 0 01/06/2021
123 0 01/08/2021
777 0 01/01/2021
777 1 01/03/2021

So I have a finite set of dates and I wanna take until the last one for each ID (in the example, for ID = 123: 01/01/2021, 01/02/2021, 01/03/2021... until 01/08/2021). So basically I could do a cross join with a calendar, but I don't know how can I fill missing value with a rule or a filter, after the cross join.

Expected output: (in bold the generated missing values)

ID FLAG DATE
123 1 01/01/2021
123 0 01/02/2021
123 1 01/03/2021
123 1 01/04/2021
123 1 01/05/2021
123 0 01/06/2021
123 0 01/07/2021
123 0 01/08/2021
777 0 01/01/2021
777 0 01/02/2021
777 1 01/03/2021

CodePudding user response:

You can first group by id to calculate max and min date then using sequence function, generate all the dates from min_date to max_date. Finally, join with original dataframe and fill nulls with last non null per group of id. Here's a complete working example:

Your input dataframe:

from pyspark.sql import Window
import pyspark.sql.functions as F

df = spark.createDataFrame([
    (123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
    (123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
    (123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
    (777, 1, "01/03/2021")
], ["id", "flag", "date"])

Groupby id and generate all possible dates for each id:

all_dates_df = df.groupBy("id").agg(
    F.date_trunc("month", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
    F.date_trunc("month", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
    "id",
    F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
    "date", F.explode("date")
).withColumn(
    "date",
    F.date_format("date", "dd/MM/yyyy")
)

Now, left join with df and use last function over a Window partitioned by id to fill null flag values:

result = all_dates_df.join(df, ["id", "date"], "left").withColumn(
   "flag",
   F.last(F.col("flag"), ignorenulls=True).over(Window.partitionBy("id").orderBy("date"))
)

result.show()
# --- ---------- ---- 
#| id|      date|flag|
# --- ---------- ---- 
#|123|01/01/2021|   1|
#|123|01/02/2021|   0|
#|123|01/03/2021|   1|
#|123|01/04/2021|   1|
#|123|01/05/2021|   1|
#|123|01/06/2021|   0|
#|123|01/07/2021|   0|
#|123|01/08/2021|   0|
#|777|01/01/2021|   0|
#|777|01/02/2021|   0|
#|777|01/03/2021|   1|
# --- ---------- ---- 

CodePudding user response:

You can find the ranges of dates between the DATE value in the current row and the following row and then use sequence to generate all intermediate dates and explode this array to fill in values for the missing dates.

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(123, 1, "01/01/2021",),
        (123, 0, "01/02/2021",),
        (123, 1, "01/03/2021",),
        (123, 0, "01/06/2021",),
        (123, 0, "01/08/2021",),
        (777, 0, "01/01/2021",),
        (777, 1, "01/03/2021",), ]

df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))

window_spec = Window.partitionBy("ID").orderBy("DATE")

next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.add_months(F.col("DATE"), 1))

next_date_range = F.add_months(next_date, -1)

df.withColumn("Ranges", F.sequence(F.col("DATE"), next_date_range, F.expr("interval 1 month")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
  .drop("Ranges").show(truncate=False)

Output

 --- ---- ---------- 
|ID |FLAG|DATE      |
 --- ---- ---------- 
|123|1   |01/01/2021|
|123|0   |01/02/2021|
|123|1   |01/03/2021|
|123|1   |01/04/2021|
|123|1   |01/05/2021|
|123|0   |01/06/2021|
|123|0   |01/07/2021|
|123|0   |01/08/2021|
|777|0   |01/01/2021|
|777|0   |01/02/2021|
|777|1   |01/03/2021|
 --- ---- ---------- 
  • Related