I have a case class as follows:
case class UniqueException(
ExceptionId:String,
LastUpdateTime:Timestamp,
IsDisplayed:Boolean,
Message:String,
ExceptionType:String,
ExceptionMessage:String,
FullException:String
)
This is used to generate a Delta table.
Following conditions need to be met:
- A new UniqueException needs to be inserted if the ExceptionId of the UniqueException is new to the delta table.
- An existing UniqueException needs be updated if the ExceptionId of the incoming UniqueException exists already in the delta table and LastUpdateTime of the incoming UniqueException is greater than by 14 days.
- If the LastUpdateTime is less than 14 days, then the incoming UniqueException should not be updated if the ExceptionId of the incoming UniqueException exists already in the delta table.
I wrote the following code, but it doesn't satisfy the above cases.
val dfUniqueException = DeltaTable.forPath(outputFolder)
dfUniqueException.as("existing")
.merge(dfNewExceptions.as("new"), "new.ExceptionId = existing.ExceptionId and new.LastUpdateTime > date_add(existing.LastUpdateTime, 14")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Any idea how the above conditions can be fulfilled with a single merge statement?
CodePudding user response:
Actually your rules can be rewritten as follow:
- if there is an existing exception
- if
LastUpdateTime
difference between existing and new is more than 14 days, update existing - else do nothing
- if
- else insert new exception
So you can change your code to put the "14 days rule" in whenMatched
clause and not in merge
clause, as follow:
import io.delta.tables.DeltaTable
val dfUniqueException = DeltaTable.forPath(outputFolder)
val dfNewExceptionLabeled = dfNewExceptions.as("new")
dfUniqueException.as("existing")
.merge(dfNewExceptionLabeled, "new.ExceptionId = existing.ExceptionId")
.whenMatched("new.LastUpdateTime > date_add(existing.LastUpdateTime, 14)")
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
If you apply this code with following existing exceptions:
--------------- ------------------- --------
|ExceptionId |LastUpdateTime |Message |
--------------- ------------------- --------
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_02|2021-03-10 00:00:00|value_02|
|exception_id_03|2021-03-10 00:00:00|value_03|
--------------- ------------------- --------
And the following new exceptions:
--------------- ------------------- --------
|ExceptionId |LastUpdateTime |Message |
--------------- ------------------- --------
|exception_id_02|2021-03-20 00:00:00|value_04|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_04|2021-03-31 00:00:00|value_06|
--------------- ------------------- --------
You final result in delta table is:
--------------- ------------------- --------
|ExceptionId |LastUpdateTime |Message |
--------------- ------------------- --------
|exception_id_04|2021-03-31 00:00:00|value_06|
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_02|2021-03-10 00:00:00|value_02|
--------------- ------------------- --------