Home > Software engineering >  Joining 2 dataframes pyspark
Joining 2 dataframes pyspark

Time:12-30

I am new to Pyspark. I have data like this in 2 tables as below. I am using data frames.

Table1:

Id Amount Date
1 £100 01/04/2021
1 £50 08/04/2021
2 £60 02/04/2021
2 £20 06/05/2021

Table2:

Id Status Date
1 S1 01/04/2021
1 S2 05/04/2021
1 S3 10/04/2021
2 S1 02/04/2021
2 S2 10/04/2021

I need to join those 2 data frames above to produce output like this as below.

For every record in table 1, we need to get the record from table 2 valid as of that Date and vice versa. For e.g, table1 has £50 for Id=1 on 08/04/2021 but table 2 has a record for Id=1 on 05/04/2021 where status changed to S2. So, for 08/04/2021 the status is S2. That's what I am not sure how to give in the join condition to get this output

What's the efficient way of achieving this?

Expected Output:

Id Status Date Amount
1 S1 01/04/2021 £100
1 S2 05/04/2021 £100
1 S2 08/04/2021 £50
1 S3 10/04/2021 £50
2 S1 02/04/2021 £60
2 S2 10/04/2021 £60
2 S2 06/05/2021 £20

CodePudding user response:

Use full join on Id and Date then lag window function to get the values of Status and Amount from the precedent closest Date row:

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy("Id").orderBy(F.to_date("Date", "dd/MM/yyyy"))

joined_df = df1.join(df2, ["Id", "Date"], "full").withColumn(
    "Status",
    F.coalesce(F.col("Status"), F.lag("Status").over(w))
).withColumn(
    "Amount",
    F.coalesce(F.col("Amount"), F.lag("Amount").over(w))
)

joined_df.show()
# --- ---------- ------ ------ 
#| Id|      Date|Amount|Status|
# --- ---------- ------ ------ 
#|  1|01/04/2021|  £100|    S1|
#|  1|05/04/2021|  £100|    S2|
#|  1|08/04/2021|   £50|    S2|
#|  1|10/04/2021|   £50|    S3|
#|  2|02/04/2021|   £60|    S1|
#|  2|10/04/2021|   £60|    S2|
#|  2|06/05/2021|   £20|    S2|
# --- ---------- ------ ------ 
  • Related