I am writing a custom structured streaming source and can't figure out how batch sizes can be limited. The new MicroBatchStream
interface provides the planInputPartitions
method that gets called with the return value of latestOffset
as end Offset. It then returns a partitioning of the data up to the provided latest offset to be processed in a single batch.
When I start a new streaming query this results in an enormously large first batch as all historic data gets crammed into a single batch.
I already tried manually limiting the batch size by gradually increasing the latestOffset depending on what has already been committed. When restarting a query from a checkpoint, however, this fails as nothing has been committed yet.
Is there a (obvious) way of limiting streaming batch sizes?
CodePudding user response:
You can use SupportsAdmissionControl interface for this purpose. This gives you the Offset latestOffset(Offset startOffset, ReadLimit limit);
method that allows you to get the startOffset
which is actually the endOffset
of the previous batch. This way you can apply the size limit before you return the latestOffset
response when you calculate it. Depending on your needs, you don't necessarily need to use the ReadLimit
parameter - in our case we just have a predefined threshold which we use. The important part for you is the startOffset
parameter.
Then, planInputPartitions
will be called with the correct start and end offsets, which was caluclated using your size limit.
You can see an example for that in Kafka DataSource implmentation - see in here.