I am trying to merge a dataframe that contains incremental data into my base table as per the databricks documentation.
base_delta.alias('base') \
.merge(source=kafka_df.alias('inc'),
condition='base.key1=ic.key1 and base.key2=inc.key2') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
The above operation is working fine but it takes lot time as expected since there are lot of unwanted partitions that are being scanned. I came across a databricks documentation here, a merge query with partitions specified in it.
Code from that link:
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
""".stripMargin)
The partitions are specified in the IN
condition as 1,2,3...
But in my case, the table is first partitioned on COUNTRY
values USA, UK, NL, FR, IND
and then every country has partition on YYYY-MM
Ex: 2020-01, 2020-02, 2020-03
How can I specify the partition values if I have nested structure like I mentioned above ?
Any help is massively appreciated.
CodePudding user response:
Yes, you can do that & it's really recommended, because Delta Lake needs to scan all the data that are matching to the ON
condition. If you're using Python API, you just need to use correct SQL expression as condition
, and you can put restrictions on the partition columns into it, something like this in your case (date
is the column from the update date):
base.country = 'country1' and base.date = inc.date and
base.key1=inc.key1 and base.key2=inc.key2
if you have multiple countries, then you can use IN ('country1', 'country2')
, but it would be easier to have country
inside your update dataframe and match using base.country = inc.country