Home > Mobile >  How to specify nested partitions in merge query while trying to merge incremental data with a base t
How to specify nested partitions in merge query while trying to merge incremental data with a base t

Time:02-18

I am trying to merge a dataframe that contains incremental data into my base table as per the databricks documentation.

base_delta.alias('base') \
    .merge(source=kafka_df.alias('inc'), 
           condition='base.key1=ic.key1 and base.key2=inc.key2') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

The above operation is working fine but it takes lot time as expected since there are lot of unwanted partitions that are being scanned. I came across a databricks documentation here, a merge query with partitions specified in it.

Code from that link:

spark.sql(s"""
     |MERGE INTO $targetTableName
     |USING $updatesTableName
     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
     |WHEN NOT MATCHED THEN
     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

The partitions are specified in the IN condition as 1,2,3... But in my case, the table is first partitioned on COUNTRY values USA, UK, NL, FR, IND and then every country has partition on YYYY-MM Ex: 2020-01, 2020-02, 2020-03 How can I specify the partition values if I have nested structure like I mentioned above ? Any help is massively appreciated.

CodePudding user response:

Yes, you can do that & it's really recommended, because Delta Lake needs to scan all the data that are matching to the ON condition. If you're using Python API, you just need to use correct SQL expression as condition, and you can put restrictions on the partition columns into it, something like this in your case (date is the column from the update date):

base.country = 'country1' and base.date = inc.date and 
  base.key1=inc.key1 and base.key2=inc.key2

if you have multiple countries, then you can use IN ('country1', 'country2'), but it would be easier to have country inside your update dataframe and match using base.country = inc.country

  • Related