I made Dataframe in Spark.
The Dataframe has new rows and the same rows by key columns that table of database has.
I need insert new rows and update existing rows.
For example:
Dataframe:
Key1 | Key2 | Value |
---|---|---|
1 | 11 | new value |
2 | 22 | value |
Table in database:
Key1 | Key2 | Value |
---|---|---|
1 | 11 | old value |
3 | 33 | other value |
I need to write my DataFrame into database and get next result:
Table in database:
Key1 | Key2 | Value |
---|---|---|
1 | 11 | new value |
2 | 22 | value |
3 | 33 | other value |
Where
(1, 11) was updated
(2, 22) was inserted
(3, 33) wasn't changed
I guess there are two possible solutions:
- Merge data in new DataFrame and fully rewrite table in Database
- Or insert/update data from DataFrame directly into Database
I have no idea how to proceed with this. Which instruments to use for this task? Help me to understand in which direction to move.
CodePudding user response:
Unfortunately, there is no SaveMode.Upsert feature in spark currently. (SaveMode.overwrite) will overwrite your existing table with your Dataframe.
You can repartition the dataframe and create a JDBC/POSTGRESS connection per partition and perform batch update for upsert.
https://medium.com/@thomaspt748/how-to-upsert-data-into-relational-database-using-spark-7d2d92e05bb9