I am trying to implement the Auto Loder using the Merge Into on multiple tables using the code below as stated in the documentation:
def upsert_data(df, epoch_id):
deltaTable = DeltaTable.forPath(spark, target_location)\
deltaTable.alias("t").merge(df.alias("s"),\
"t.xx = s.xx and t.xx1 = s.xx1") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll() \
.execute()
for i in range(len(list_of_files()[0])):
schema =list_of_files()[2][i]
raw_data = list_of_files()[1][i]
checkpoint= list_of_files()[3][i]
target_location = list_of_files()[4][i]
dfSource =list_of_files(raw_data)
dfMergedSchema = dfSource.where("1=0")
dfMergedSchema.createOrReplaceGlobalTempView("test1")
dfMergedSchema.write.option("mergeSchema","true").mode("append").format("delta")\
.save(target_location)
stream = spark.readStream\
.format("cloudFiles")\
.option("cloudFiles.format", "parquet")\
.option("header", "true")\
.schema(schema)\
.load(raw_data)
stream.writeStream.format("delta")\
.outputMode("append")\
.foreachBatch(upsert_data)\
.option("dataChange", "false")\
.trigger(once=True)\
.option("checkpointLocation", checkpoint)\
.start()
My scenario: We have a Landing Zone where Parquet files are appended into multiple folders for example as shown below:
Landing Zone ---|
|-----folder 0 ---|----parquet1
| |----parquet2
| |----parquet3
|
|
|-----folder 1 ---|----parquet1
| |----parquet2
| |----parquet3
Then I am needing Auto Loader to create the tables as shown below with the checkpoints:
Staging Zone ---|
|-----folder 0 ---|----checkpoint
| |----table
|
|
|
|-----folder 1 ---|----checkpoint
| |----table
|
I am noticing that without the foreachBatch option in the Writestream, but with the Trigger once, the code works as expected for inserts for multiple tables as in above. The code also works when we have both foreachBatch and Trigger options on individual tables without the for loop. However, when I try to enable both options (foreachBatch and the Trigger Once) for multiple tables as in the for loops, Auto Loader is merging all the table contents into one table. You get a checkpoint, but no table contents for folder 0 in Staging Zone, and in folder 1, you get a checkpoint, but delta files that make up the table contents for both folder 0 and 1 in the table folder of folder 1. It's merging both tables into one.
I also get the ConcurrentAppendException.
I read about the ConcurrentAppendException in the documentation, and what I am finding is that you either use partitioning or have a disjointed condition in the upsert_data function passed into the foreachBatch option of the WriteStream. I tried both and none works.
How can one isolate the streams for the different folders in this scenario for the Staging Zone, while using foreachBatch and the Trigger Once in this for loop? There is something I am definitely missing with the foreachBatch option here because without it, Auto Loader is able to isolate the streams to folder 0 and folder 1, but with it, it's not.
CodePudding user response:
Spoke with a Databricks Solution Architect today, and he mentioned that I needed to use a ThreadPoolExecutor, which is something outside the Auto Loader or Databricks itself, but native to Python. That will be in a helper function, which specifies the number of streams to handle the tables in parallel with Auto Loader. So, one can use a single instance of Auto Loader notebook for multiple tables, which meets my use case. Thanks!