I am new to Spring Boot @kafkaListener
. My application receiving almost 200K message per second on topic. I want to separate message listener and processing of the message.
How can I use java.util.concurrent.BlockingQueue
with @kafkaListener
? Can I use it by using CompletableFuture
?
Any sample code will help more.
CodePudding user response:
It might not be an answer you are looking for, but what you are asking is an anti-pattern in messaging. What is the point to move data from broker to in-memory? You fully eliminate a purpose of messaging middleware altogether with that internal queue shifting. On the other hand you just push the problem a little bit further, but too many messages is still going to be there when you won't be able to consume them from that internal queue eventually. And in the end it is going to be out of memory error.
It is better to think about the solution to process them as fast as you can and don't pull more data from Kafka topic until the current batch is done. This way the records won't be lost because they remain on the broker and there won't be any failure when not enough memory.
That's really was the main point with messaging middleware originally: distinguish a producer and consumer and let them do their stuff with their own pace. Whatever is not processed is stored on the broker and can be consumed later on.
CodePudding user response:
I believe you want to have your consumer with pipelining implemented. Its not uncommon for one to implement this in a scenario like yours. Why? Well, the KafkaConsumer
lacks in that decompressing/deserializing can be time consuming without considering the time it takes to do processing. Since these operations are stacked behind one thread, it would be ideal to separate the polling from the processing, which is achieved through a couple of buffers.
One way to do this:
Your EventReceiver
spins up a thread for the polling. That thread would do the same thing you always do, but instead of firing off the listeners for each event, you'd pass the event to a receivedEvents
buffer which could be BlockingQueue<RecieveEvent>
. So in the for loop, you pass each record to the blocking queue. This thread would leverage another buffer once the for loop is over, like Queue<Map<TopicPartition, OffsetAndMetadata>>
-- and it would commit the offsets that the proccessingThread has successfully processed.
Next, your EventReceiver
spins up another thread - processingThread. This would handle pulling records from the buffer, firing the event to all the listeners for this receiver, and then update the Queues state for the pollingThread to commit.
Why doesn't the processingThread just commit the events instead of passing it back to the pollingThread? This is bc KafkaConsumer
requires that the same thread that calls .poll()
should be the one that calls consumer.commitAsync(...)
or else you'll get a concurrency exception.
This approach doesn't work with auto commit enabled.
In terms of how one can do this using Spring Kafka, I'm not completely sure. However, I do know Spring Kafka separates EventReceiver
from EventListener (@KafkaListener)
which is separating the low-level kafka work from the business logic. In theory, you'd have to tune their implementation, but I think implementing this one without Spring Kafka library would be easier.