Home > Enterprise >  PySpark Conditional formatting on final DataFrame after joining two dataframes
PySpark Conditional formatting on final DataFrame after joining two dataframes

Time:04-02

PySpark DataFrame Scenario:

  1. There is a DataFrame called DF. Two main columns of DF are ID and Date.
  2. Each ID has on average 40 unique Dates (not continuous dates).
  3. Now, there is second DataFrame called DF_date which has one column named Date. The dates in Dates range between maximum and minimum of 'Date' from DF.
  4. Now, the goal is to fill DF with the continuous Start and End date of each unique 'ID' (missing discontinued dates are filled with left join between DF_date and DF.

DF

 ------------- ------------- ---------------- 
|         Date|          Val|              ID|
 ------------- ------------- ---------------- 
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
 ------------- ------------- ----------------  

DF_date

 ------------- 
|         Date|
 ------------- 
|   2021-07-01|
|   2021-07-02|
|   2021-07-03|
|   2021-07-04|
|   2021-07-05|
|   2021-07-06|
 ------------- 

Expected Final Output:

 ------------- ------------- ---------------- 
|         Date|          Val|              ID|
 ------------- ------------- ---------------- 
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-02|     81119.73|         Ax3838J|
|   2021-07-03|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-03|     81249.76|         Bz3838J|
|   2021-07-04|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
 ------------- ------------- ----------------  

CodePudding user response:

Your question doesn't make sense. Why have a DF_date dataframe with start and end dates, use them to fill in date and then resort to using the DF start and end date. Why not just fill in missing dates by using DF min and max date for each group.

Anyway, this is how you fill in missing dates based on DF_Date

Following your comments, see my edits

  new = (DF.groupby('ID')
       .agg(to_date(first('Date')).alias('min_date')#minimum date per group
       ,to_date(last('Date')).alias('max_date')#max date per group
       ,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
       
       #Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
       .selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
       .drop('Date').withColumnRenamed('2','Date')
       #Forward fill the Val column
      .withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
       
      ).show()

 ------- -------- ---------- 
|     ID|     Val|      Date|
 ------- -------- ---------- 
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
 ------- -------- ---------- 

CodePudding user response:

In the above question, I later realised as suggested @wwnde there is no need to create a separate DF for Dates.

Code provided below serves the purpose too -

# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')


# the ranges of dates between the DATE value in the current row and the following row 

next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE")   F.expr("interval 1 day"))

end_date_range = next_date - F.expr("interval 1 day")

# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.

final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))\
  .drop("Ranges")

display(final_result)

  • Related