I am continuously updating a Delta Table in Databricks using :
delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.alias("t").merge(
df.alias("s"), ... # join condition
).whenMatchedUpdate(
set=... # update columns
).whenNotMatchedInsert(
values=... # insert columns
).execute()
Later, I would like to query different versions of the table using a timestamp. For example :
df = spark.read.format(
"delta"
).option(
"timestampAsOf",
"2022-11-14 12:00:00",
).load(
"s3://BUCKET/PREFIX/"
)
However, the timestamp here corresponds to the exact timestamp of the merge. If I happen to know a particular data set should correspond to a specific timestamp (if it's a dump from last week for example), is there any way to set that timestamp as the associated timestamp, rather than have it be set to whatever the time is when specifically executing the merge ? This would particularly useful if ever re-ingesting data for example, among other use cases.
CodePudding user response:
Not aware of any option to do so. But there are other options which might be relevant.
- Adding user defined metadata to the merge commit, here you could log your own timestamp. You can then view this with a
describe history
. From the databricks docs:
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/tmp/delta/people10m")
- Simply have a log / table where you log the table name, version, and the desired timestamp.
A relevant heads up, by default the ability that you can time travel your delta table is only guaranteed up to 30 days. This can be configured using table properties delta.logRetentionDuration
and delta.deletedFileRetentionDuration
, the VACUUM operation (used for cleaning up parquet files of older version of the table) also plays a role here. See the docs.