Home > Enterprise >  Join two timeseries dataframes to get the most recent right entry for each left entry in PySpark
Join two timeseries dataframes to get the most recent right entry for each left entry in PySpark

Time:11-17

I have two Sparks dataframes:

df1 with one entry per id and date:

|date       |id   |
 ----------- ----- 
|2021-11-15 |    1|
|2021-11-14 |    1|
|2021-11-15 |    2|
|2021-11-14 |    2|
|2021-11-15 |    3|
|2021-11-14 |    3|

df2 with multiple log entries:

|date       |id   |
 ----------- ----- 
|2021-11-13 |    1|
|2021-11-13 |    1|
|2021-11-13 |    3|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-15 |    1|
|2021-11-15 |    1|

how can I join these dfs, so that I get the most recent possible entry (date(df2) should be <= date) per id and date of df2?

|date       |id    |   date(df2)| 
 ----------- ------ ------------ 
|2021-11-15 |    1 | 2021-11-15 |
|2021-11-14 |    1 | 2021-11-14 |
|2021-11-15 |    2 |       null |
|2021-11-14 |    2 |       null |
|2021-11-15 |    3 | 2021-11-13 |
|2021-11-14 |    3 | 2021-11-13 |

THX Into Numbers

CodePudding user response:

Use join then group by df1.id and df2.date and use conditional aggregation to get max df2.date <= df1.date

import pyspark.sql.functions as F


result_df = df1.join(
    df2.withColumnRenamed("date", "df2_date"),
    ["id"],
    "left"
).groupBy("id", "date").agg(
    F.max(
        F.when(F.col("df2_date") <= F.col("date"), F.col("df2_date"))
    ).alias("df2_date")
)

result_df.show()
# --- ---------- ---------- 
#| id|      date|  df2_date|
# --- ---------- ---------- 
#|  1|2021-11-14|2021-11-14|
#|  1|2021-11-15|2021-11-15|
#|  2|2021-11-14|      null|
#|  2|2021-11-15|      null|
#|  3|2021-11-14|2021-11-13|
#|  3|2021-11-15|2021-11-13|
# --- ---------- ---------- 
  • Related