Home > Software design >  How to get updated or new records by comparing two dataframe in pyspark
How to get updated or new records by comparing two dataframe in pyspark

Time:09-17

I have two dataframes like this:

df2.show()

 ---- ------- ------ 
|NAME|BALANCE|SALARY|
 ---- ------- ------ 
|PPan|     11|   500|
|Liza|     20|   900|
 ---- ------- ------ 

df3.show()

 ---- ------- ------ 
|NAME|BALANCE|SALARY|
 ---- ------- ------ 
|PPan|     10|   700|
| Cal|     70|   888|
 ---- ------- ------ 

df2 here, represents existing database records and df3 represents new records/updated records(any column) which need to be inserted/updated into db.For ex: NAME=PPan the new balance is 10 as per df3. so For NAME=PPan entire row has to be replaced in df2 and for NAME=Cal, a new row has to be added and for name=Liza will be untouched like this:

     ---- ------- ------ 
    |NAME|BALANCE|SALARY|
     ---- ------- ------ 
    |PPan|     10|   700|
    |Liza|     20|   900|
    | Cal|     70|   888|
     ---- ------- ------ 

How can I achieve this use case?

CodePudding user response:

First you need to join both dataframes using full method to keep unmatched rows (new) and to updating the matched records I do prefer to use select with coalesce function:

joined_df = df2.alias('rec').join(df3.alias('upd'), on='NAME', how='full')
#  ---- ------- ------ ------- ------ 
# |NAME|BALANCE|SALARY|BALANCE|SALARY|
#  ---- ------- ------ ------- ------ 
# |Cal |null   |null  |70     |888   |
# |Liza|20     |900   |null   |null  |
# |PPan|11     |500   |10     |700   |
#  ---- ------- ------ ------- ------ 

output_df = joined_df.selectExpr(
  'NAME', 
  'COALESCE(upd.BALANCE, rec.BALANCE) BALANCE', 
  'COALESCE(upd.SALARY, rec.SALARY) SALARY'
)

output_df.sort('BALANCE').show(truncate=False)
 ---- ------- ------ 
|NAME|BALANCE|SALARY|
 ---- ------- ------ 
|PPan|10     |700   |
|Liza|20     |900   |
|Cal |70     |888   |
 ---- ------- ------ 
  • Related