I have two dataframes like this:
df1 = spark.createDataFrame([(1, 11, 1999, 1999, None), (2, 22, 2000, 2000, 44), (3, 33, 2001, 2001,None)], ['id', 't', 'year','new_date','rev_t'])
df2 = spark.createDataFrame([(2, 44, 2022, 2022,None), (2, 55, 2001, 2001, 88)], ['id', 't', 'year','new_date','rev_t'])
df1.show()
df2.show()
--- --- ---- -------- -----
| id| t|year|new_date|rev_t|
--- --- ---- -------- -----
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2000| 44|
| 3| 33|2001| 2001| null|
--- --- ---- -------- -----
--- --- ---- -------- -----
| id| t|year|new_date|rev_t|
--- --- ---- -------- -----
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
--- --- ---- -------- -----
I want to join them in a way that if df2.t == df1.rev_t
then update new_date
to df2.year
in the result dataframe.
So it should look like this:
--- --- ---- -------- -----
| id| t|year|new_date|rev_t|
--- --- ---- -------- -----
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2022| 44|
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
| 3| 33|2001| 2001| null|
--- --- ---- -------- -----
CodePudding user response:
To update a column from df2
in df1
, you use left join coalesce
function for the column you want to update, in this case new_date
.
From your expected output, it appears you want also to add the rows from df2
, so union the join result with df2
:
from pyspark.sql import functions as F
result = (df1.join(df2.selectExpr("t as rev_t", "new_date as df2_new_date"), ["rev_t"], "left")
.withColumn("new_date", F.coalesce("df2_new_date", "new_date"))
.select(*df1.columns).union(df2)
)
result.show()
# --- --- ---- -------- -----
#| id| t|year|new_date|rev_t|
# --- --- ---- -------- -----
#| 1| 11|1999| 1999| null|
#| 3| 33|2001| 2001| null|
#| 2| 22|2000| 2022| 44|
#| 2| 44|2022| 2022| null|
#| 2| 55|2001| 2001| 88|
# --- --- ---- -------- -----