I am new to Pyspark. I have data like this in 2 tables as below. I am using data frames.
Table1:
Id | Amount | Date |
---|---|---|
1 | £100 | 01/04/2021 |
1 | £50 | 08/04/2021 |
2 | £60 | 02/04/2021 |
2 | £20 | 06/05/2021 |
Table2:
Id | Status | Date |
---|---|---|
1 | S1 | 01/04/2021 |
1 | S2 | 05/04/2021 |
1 | S3 | 10/04/2021 |
2 | S1 | 02/04/2021 |
2 | S2 | 10/04/2021 |
I need to join those 2 data frames above to produce output like this as below.
For every record in table 1, we need to get the record from table 2 valid as of that Date
and vice versa. For e.g, table1
has £50
for Id=1
on 08/04/2021
but table 2 has a record for Id=1
on 05/04/2021
where status changed to S2
. So, for 08/04/2021
the status is S2
. That's what I am not sure how to give in the join condition to get this output
What's the efficient way of achieving this?
Expected Output:
Id | Status | Date | Amount |
---|---|---|---|
1 | S1 | 01/04/2021 | £100 |
1 | S2 | 05/04/2021 | £100 |
1 | S2 | 08/04/2021 | £50 |
1 | S3 | 10/04/2021 | £50 |
2 | S1 | 02/04/2021 | £60 |
2 | S2 | 10/04/2021 | £60 |
2 | S2 | 06/05/2021 | £20 |
CodePudding user response:
Use full join on Id
and Date
then lag
window function to get the values of Status
and Amount
from the precedent closest Date
row:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy("Id").orderBy(F.to_date("Date", "dd/MM/yyyy"))
joined_df = df1.join(df2, ["Id", "Date"], "full").withColumn(
"Status",
F.coalesce(F.col("Status"), F.lag("Status").over(w))
).withColumn(
"Amount",
F.coalesce(F.col("Amount"), F.lag("Amount").over(w))
)
joined_df.show()
# --- ---------- ------ ------
#| Id| Date|Amount|Status|
# --- ---------- ------ ------
#| 1|01/04/2021| £100| S1|
#| 1|05/04/2021| £100| S2|
#| 1|08/04/2021| £50| S2|
#| 1|10/04/2021| £50| S3|
#| 2|02/04/2021| £60| S1|
#| 2|10/04/2021| £60| S2|
#| 2|06/05/2021| £20| S2|
# --- ---------- ------ ------