PySpark DataFrame Scenario:
- There is a DataFrame called
DF
. Two main columns ofDF
areID
andDate
. - Each
ID
has on average 40 uniqueDate
s (not continuous dates). - Now, there is second DataFrame called
DF_date
which has one column namedDate
. The dates inDates
range between maximum and minimum of 'Date' fromDF
. - Now, the goal is to fill
DF
with the continuous Start and End date of each unique 'ID' (missing discontinued dates are filled withleft join
betweenDF_date
andDF
.
DF
------------- ------------- ----------------
| Date| Val| ID|
------------- ------------- ----------------
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
------------- ------------- ----------------
DF_date
-------------
| Date|
-------------
| 2021-07-01|
| 2021-07-02|
| 2021-07-03|
| 2021-07-04|
| 2021-07-05|
| 2021-07-06|
-------------
Expected Final Output:
------------- ------------- ----------------
| Date| Val| ID|
------------- ------------- ----------------
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-02| 81119.73| Ax3838J|
| 2021-07-03| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-03| 81249.76| Bz3838J|
| 2021-07-04| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
------------- ------------- ----------------
CodePudding user response:
Your question doesn't make sense. Why have a DF_date
dataframe with start and end dates, use them to fill in date and then resort to using the DF
start and end date. Why not just fill in missing dates by using DF
min and max date for each group.
Anyway, this is how you fill in missing dates based on DF_Date
Following your comments, see my edits
new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
).show()
------- -------- ----------
| ID| Val| Date|
------- -------- ----------
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
------- -------- ----------
CodePudding user response:
In the above question, I later realised as suggested @wwnde there is no need to create a separate DF for Dates.
Code provided below serves the purpose too -
# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')
# the ranges of dates between the DATE value in the current row and the following row
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
.drop("Ranges")
display(final_result)