Home > Net >  Spark Structured Streaming rate limit
Spark Structured Streaming rate limit

Time:11-28

I am Trying to control records per triggers in structured streaming . is their any function for it .tried different properties but nothing seems to be working.

%scala 
import org.apache.spark.sql.streaming.Trigger

val checkpointPath = "/user/[email protected]/dbacademy/developer-foundations-capstone/checkpoint/orders"
// val outputPath = "/user/[email protected]/dbacademy/developer-foundations-capstone/raw/orders/stream"

val devicesQuery = df.writeStream
                     .outputMode("append")
                     .format("delta")
                     .queryName("orders")
                     .trigger(Trigger.ProcessingTime("1 second"))
                     .option("inputRowsPerSecond", 1)
                     .option("maxFilesPerTrigger", 1)
//                   .option("checkpointLocation", checkpointPath)
//                   .start(orders_checkpoint_path)
                     .option("checkpointLocation",checkpointPath)
                     .table("orders")

CodePudding user response:

Delta uses two options maxFilesPerTrigger & maxBytesPerTrigger. You already use the first one, and it takes over the precedence over the second. The real number of records processed per trigger depends on the size of the input files and number of records inside it, as Delta processes complete files, not splitting it into multiple chunks.

But these options needs to be specified on the source Delta table, not on the sink, as you specify right now:

spark.readStream.format("delta")
  .option("maxFilesPerTrigger", "1")
  .load("/delta/events")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "...")
  .table("orders")

CodePudding user response:

In Spark 3.x there is 'maxOffsetsPerTrigger' for kafka.

https://www.waitingforcode.com/apache-spark-structured-streaming/does-maxoffsetspertrigger-guarantee-idempotent-processing/read is a good read.

  • Related