AIM: Increase the consumption of a message at the Consumer End using Thread Pool in Spring Application (or even any other recommended way if possible) with Rabbitmq as message broker
Scenario: I have a scenario in which our application takes around ~15 seconds for consumption of 1 message whereas we producing 1 message per second, in simple way. So, by the time 1st message is consumed after 15 seconds. We have 14 more messages and so on.
Is there any way to reduce this gap on the consumer and producer side by "increasing Consumption at consumer end"?
Existing Understanding:
I tried to increase Thread Pool so that each consumer has 15 threads. This will ensure
00:00:01 - 1st msg, picked by thread 1
00:00:02 - 2nd msg, picked by thread 2
00:00:03 - 3rd msg, picked by thread 3
00:00:04 - 4th msg, picked by thread 4
..... and soon
00:00:15 - 15th msg, picked by thread 15
00:00:16 - 16th msg, picked by thread 1 (processing of 1st msg done after 15 seconds)
00:00:17 - 17th msg, picked by thread 2 (processing of 2st msg done after 15 seconds)
..... and soon
Existing Implementation:
val factory = new SimpleRabbitListenerContainerFactory();
factory.setTaskExecutor(Executors.newFixedThreadPool(15));
With the above understanding, I implemented above implementation, but don't see any significant improvement on the consumption rate at Consumer end. I found consumption rate at consumer end independent to Thread Pool
Is above implementation correct or missing something? Are there any other ways to solve this issue?
CodePudding user response:
We can increase the Consumption Rate at Consumer end by using and increasing Thread Pool Size. Let us understand few terms first,
1) Min Concurrent Consumer is minimum concurrent consumer per listener.
2) Max Concurrent Consumer is max concurrent consumer per listener.
3) Thread Pool Size is the size of total threads/channel per connection. As per rabbitmq we can create maximum of 2047 channel/thread/consumer per connection. This number is shared across all listener on that connection.
Note: Consumer in a Queue is also known as Channel and for simplicity we can consider consumer and channel similar.
val factory = new SimpleRabbitListenerContainerFactory();
...
//Minimum Concurrent Consumer
factory.setConcurrentConsumers(1);
//Maximum Concurrent Consumer
factory.setMaxConcurrentConsumers(15);
//Thread Pool Size
factory.setTaskExecutor(Executors.newFixedThreadPool(15));
...
We can set above fields like done in above code snippet.
In the above snippet, we set Thread Pool Size to 15. Minimum and Maximum Concurrent Consumer size is 1 and 15. Based on the traffic on the Queue, application under the hood automatically scales and de-scales between Min Concurrent Consumer (1) and Max Concurrent Consumer (15), which can be accommodated by the Thread Pool as it's still less and equal to its size.
What's the difference b/w Thread Pool Size and Max Concurrent Consumer
In case when you have multiple Listener, then this value may differ, added a table with few scenarios,
Scenario | Listener 1 | Listener 2 | Listener 3 | Max Consumer | Thread Pool Size | Note |
---|---|---|---|---|---|---|
Only 1 listener is present with Thread Pool capacity fully used | Present | Not Present | Not Present | 15 | 15 | Maximum 15 Thread will be used from Thread Pool by Listener-1 |
Only 1 listener is present with Thread Pool capacity not fully used | Present | Not Present | Not Present | 15 | 20 | Maximum 15 Thread will be used from Thread Pool by Listener-1, and 5 Thread will not be used from Thread Pool |
Only 1 listener is present, but Thread Pool Size < Max Concurrent Consumer | Present | Not Present | Not Present | 15 | 5 | Application will not startup |
3 Listeners are present, with all the Threads utilized | Present | Present | Present | 15 | 45 | Each listener will pick maximum of 15 consumers based on traffic |
3 Listeners are present, with not all the Threads utilized | Present | Present | Present | 15 | 50 | Each listener will pick maximum of 15 consumers based on traffic, whereas 5 Threads will be unusued |
How can we change Minimum Concurrent Consumer size for a particular Listener?
@RabbitListener(...,concurrency = "5")
With addition of above code, you will have atleast 5 consumers always when there is no traffic. With previous code snippet, it would be 1 consumer when the traffic is low.