We are facing an issue with reading and writing streaming data into the target location.
we are working with some JSON telemetry data for tracking steps. New data files land in our delta lake every 5 seconds. Need a way that automatically ingests into delta lake.
CodePudding user response:
Hope this helps
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <schemaLocation>)
.load(<dataset_source>)
.writeStream
.format("delta")
.option("checkpointLocation", <checkpoint_path>)
.trigger(processingTime="<Provide the time>")
.outputMode("append") # you can use complete if needed
.table("table_name"))
For more info refer: https://docs.databricks.com/ingestion/auto-loader/index.html
CodePudding user response:
if you want to read particular sub folder. For Example: This is my file location /mnt/2023/01/13
.I am want to read 2023/01
inside data, then load data like thisload('/mnt/<folder>/<sub_folder>')
or /mnt/2023/*
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <Location>)
.load('/mnt/<folder>/<sub_folder>')