Home > Net >  Add missing dates in the column in a PySpark data frame
Add missing dates in the column in a PySpark data frame

Time:08-09

I need to insert additional missing entries into a PySpark data frame where dates are final day of a quarter.

ID Date count
1 2018-03-31 1
1 2018-06-30 8
1 2018-12-31 1
1 2019-06-30 2

I need the output to look like this:

ID Date count
1 2018-03-31 1
1 2018-06-30 8
1 2018-09-30 Null
1 2018-12-31 1
1 2019-06-30 Null
1 2019-06-30 2

There are hundreds of such IDs and the number of quarter dates are fixed for all IDs i.e., '2018-03-31' to '2019-06-30'.

CodePudding user response:

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, '2018-03-31', 1),
     (1, '2018-06-30', 8),
     (1, '2018-12-31', 1),
     (1, '2019-06-30', 2)],
    ['ID', 'Date', 'count'])

The case when the day of the min date of a sequence is number 31

min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]

df = (df
    .select("ID").distinct()
    .withColumn("Date", F.expr(f"explode(sequence(to_date('{min_date}'), to_date('{max_date}'), interval 3 month))"))
    .join(df, ["ID", "Date"], "left")
)
df.show()
#  --- ---------- ----- 
# | ID|      Date|count|
#  --- ---------- ----- 
# |  1|2018-03-31|    1|
# |  1|2018-06-30|    8|
# |  1|2019-03-31| null|
# |  1|2018-12-31|    1|
# |  1|2018-09-30| null|
# |  1|2019-06-30|    2|
#  --- ---------- ----- 

Option working for both 30 and 31 as the number of the first day in a sequence

This option is safer if you don't know what is your min date. Everything the same, just before the join dates are shifted 1 day to right. After the join they are returned to the original date.

df = df.withColumn('Date', F.date_add('Date', 1))
min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]
df = (df
    .select("ID").distinct()
    .withColumn("Date", F.expr(f"explode(sequence(to_date('{min_date}'), to_date('{max_date}'), interval 3 month))"))
    .join(df, ["ID", "Date"], "left")
)
df = df.withColumn('Date', F.date_sub('Date', 1))

df.show()
#  --- ---------- ----- 
# | ID|      Date|count|
#  --- ---------- ----- 
# |  1|2018-03-31|    1|
# |  1|2018-06-30|    8|
# |  1|2019-03-31| null|
# |  1|2018-12-31|    1|
# |  1|2018-09-30| null|
# |  1|2019-06-30|    2|
#  --- ---------- ----- 
  • Related