Home > Software engineering >  Limit Batch Size in Apache Spark 3.0 Structured Streaming - MicroBatchStream
Limit Batch Size in Apache Spark 3.0 Structured Streaming - MicroBatchStream

Time:05-23

I am writing a custom structured streaming source and can't figure out how batch sizes can be limited. The new MicroBatchStream interface provides the planInputPartitions method that gets called with the return value of latestOffset as end Offset. It then returns a partitioning of the data up to the provided latest offset to be processed in a single batch.

When I start a new streaming query this results in an enormously large first batch as all historic data gets crammed into a single batch.

I already tried manually limiting the batch size by gradually increasing the latestOffset depending on what has already been committed. When restarting a query from a checkpoint, however, this fails as nothing has been committed yet.

Is there a (obvious) way of limiting streaming batch sizes?

CodePudding user response:

You can use SupportsAdmissionControl interface for this purpose. This gives you the Offset latestOffset(Offset startOffset, ReadLimit limit); method that allows you to get the startOffset which is actually the endOffset of the previous batch. This way you can apply the size limit before you return the latestOffset response when you calculate it. Depending on your needs, you don't necessarily need to use the ReadLimit parameter - in our case we just have a predefined threshold which we use. The important part for you is the startOffset parameter.

Then, planInputPartitions will be called with the correct start and end offsets, which was caluclated using your size limit.

You can see an example for that in Kafka DataSource implmentation - see in here.

  • Related