Home > other >  Increase the rabbitmq consumption of a message using Thread Pool in Spring Application
Increase the rabbitmq consumption of a message using Thread Pool in Spring Application

Time:07-06

AIM: Increase the consumption of a message at the Consumer End using Thread Pool in Spring Application (or even any other recommended way if possible) with Rabbitmq as message broker

Scenario: I have a scenario in which our application takes around ~15 seconds for consumption of 1 message whereas we producing 1 message per second, in simple way. So, by the time 1st message is consumed after 15 seconds. We have 14 more messages and so on.

Is there any way to reduce this gap on the consumer and producer side by "increasing Consumption at consumer end"?

Existing Understanding:

I tried to increase Thread Pool so that each consumer has 15 threads. This will ensure

00:00:01 - 1st msg, picked by thread 1
00:00:02 - 2nd msg, picked by thread 2
00:00:03 - 3rd msg, picked by thread 3
00:00:04 - 4th msg, picked by thread 4
..... and soon
00:00:15 - 15th msg, picked by thread 15
00:00:16 - 16th msg, picked by thread 1 (processing of 1st msg done after 15 seconds)
00:00:17 - 17th msg, picked by thread 2 (processing of 2st msg done after 15 seconds)
..... and soon

Existing Implementation:

val factory = new SimpleRabbitListenerContainerFactory();
factory.setTaskExecutor(Executors.newFixedThreadPool(15));

With the above understanding, I implemented above implementation, but don't see any significant improvement on the consumption rate at Consumer end. I found consumption rate at consumer end independent to Thread Pool

Is above implementation correct or missing something? Are there any other ways to solve this issue?

CodePudding user response:

We can increase the Consumption Rate at Consumer end by using and increasing Thread Pool Size. Let us understand few terms first,

1) Min Concurrent Consumer is minimum concurrent consumer per listener.

2) Max Concurrent Consumer is max concurrent consumer per listener.

3) Thread Pool Size is the size of total threads/channel per connection. As per rabbitmq we can create maximum of 2047 channel/thread/consumer per connection. This number is shared across all listener on that connection.

Note: Consumer in a Queue is also known as Channel and for simplicity we can consider consumer and channel similar.

val factory = new SimpleRabbitListenerContainerFactory();
...
//Minimum Concurrent Consumer
factory.setConcurrentConsumers(1);
//Maximum Concurrent Consumer
factory.setMaxConcurrentConsumers(15);
//Thread Pool Size
factory.setTaskExecutor(Executors.newFixedThreadPool(15));
...

We can set above fields like done in above code snippet.

In the above snippet, we set Thread Pool Size to 15. Minimum and Maximum Concurrent Consumer size is 1 and 15. Based on the traffic on the Queue, application under the hood automatically scales and de-scales between Min Concurrent Consumer (1) and Max Concurrent Consumer (15), which can be accommodated by the Thread Pool as it's still less and equal to its size.

What's the difference b/w Thread Pool Size and Max Concurrent Consumer

In case when you have multiple Listener, then this value may differ, added a table with few scenarios,

Scenario Listener 1 Listener 2 Listener 3 Max Consumer Thread Pool Size Note
Only 1 listener is present with Thread Pool capacity fully used Present Not Present Not Present 15 15 Maximum 15 Thread will be used from Thread Pool by Listener-1
Only 1 listener is present with Thread Pool capacity not fully used Present Not Present Not Present 15 20 Maximum 15 Thread will be used from Thread Pool by Listener-1, and 5 Thread will not be used from Thread Pool
Only 1 listener is present, but Thread Pool Size < Max Concurrent Consumer Present Not Present Not Present 15 5 Application will not startup
3 Listeners are present, with all the Threads utilized Present Present Present 15 45 Each listener will pick maximum of 15 consumers based on traffic
3 Listeners are present, with not all the Threads utilized Present Present Present 15 50 Each listener will pick maximum of 15 consumers based on traffic, whereas 5 Threads will be unusued

How can we change Minimum Concurrent Consumer size for a particular Listener?

@RabbitListener(...,concurrency = "5")

With addition of above code, you will have atleast 5 consumers always when there is no traffic. With previous code snippet, it would be 1 consumer when the traffic is low.

  • Related