Home > Mobile >  How to merge a spark dataframe with hive table on Databricks Deltalake?
How to merge a spark dataframe with hive table on Databricks Deltalake?

Time:10-20

I have a dataframe as below:

val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)

I have a hive table with same columns and exact schema:

val df2 = spark.sql("select * from db.table")

From the incoming dataframe df1 I got two new records and 2 updated records.

val df2 = spark.sql("select * from db.table where name in ('James', 'Michael')")
df2.show()
 ------- ------- --- ---------- 
|   name|   dept| id|      role|
 ------- ------- --- ---------- 
|  James|  Sales| 34| Associate|
|Michael|  Sales| 56|    Junior|
 ------- ------- --- ---------- 

The keys in use here are: dept & id

In one of my previous projects, we used to join the incoming dataframe with the partition of our Hive table in our staging table and simply run exchange partition in order to swap the existing hive partition with our staging table which contains merged data.

We are using Databricks distribution of Spark. Our hive table is built on Databricks delta lake & has millions of rows. Is there any other way I can merge my incoming dataframe df1 with the my hive table ? If so how can I achieve it without performance hit.

CodePudding user response:

As it's mentioned by Tim, if your destination table is already on Delta, then you can just need to use MERGE INTO SQL command, or corresponding Scala API (see docs on Delta Merge). You need something like this:

import io.delta.tables._
import org.apache.spark.sql.functions._

DeltaTable.forName(spark, "db.table")
  .as("target")
  .merge(
    df1.as("updates"),
    "target.dept = updates.dept and target.id = updates.id")
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()

The not matched data will be inserted as-is, and matched data will be put into the rewrited files that contain the original records. Usually that rewrite is the major performance hit, and you may need to decrease the file size to rewrite less data (see docs) - in newer versions it's possible to configure tables such way so Databricks Spark engine will automatically find optimal file size to decrease the rewrite time without affecting the read patterns (see docs)

  • Related