Home > front end >  How to process a large delta table with UDF?
How to process a large delta table with UDF?

Time:03-29

I have a delta table with about 300 billion rows. Now I am performing some operations on a column using UDF and creating another column

My code is something like this

def my_udf(data):
    return pass
       
 
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))

The issue now is this take a long amount of time as Spark will process all 300 billion rows and then write the output. Is there a way where we can do some Mirco batching and write output of those regularly to the output delta table

CodePudding user response:

The first rule usually is to avoid UDFs as much of possible - what kind of transformation do you need to perform that isn't available in the Spark itself?

Second rule - if you can't avoid using UDF, at least use Pandas UDFs that process data in batches, and don't have so big serialization/deserialization overhead - usual UDFs are handling data row by row, encoding & decoding data for each of them.

If your table was built over the time, and consists of many files, you can try to use Spark Structured Streaming with Trigger.AvailableNow (requires DBR 10.3 or 10.4), something like this:

maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
  .option("maxFilesPerTrigger", maxNumFiles) \ 
  .table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
  .option("checkpointLocation", "/some/path") \
  .trigger(availableNow=True) \
  .toTable("my_destination_table")

this will read the source table chunk by chunk, apply your transformation, and write data into a destination table.

  • Related