We are using RabbitMq with default spring boot configurations. We have a use case in which we want no parallelism for one of the listeners. That is, we want only one thread of the consumer to be running at any given point in time. We want this, because the nature of the use case is such that we want the messages to be consumed in order, thus if there are multiple threads per consumer there can be chances that the messages are processed out of order.
Since, we are using the defaults and have not explicitly tweaked the container, we are using the SimpleMessageListenerContainer
. By looking at the documentation I tried fixing the number of consumers using concurrency = "1"
. The annotation on the target method looks like this @RabbitListener(queues = ["queue-name"], concurrency = "1")
.
As per the documentation this should have ensured that there is only consumer thread.
{@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed * number of consumers in the {@code concurrentConsumers} property
2021-10-29 06:11:26.361 INFO 29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx : Created xxx
2021-10-29 06:11:26.383 INFO 29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx : Created xxx
ThreadIds to be noted here are [ntContainer#4-1]
and [ntContainer#0-1]
.
So the question is- how can we ensure that there is only one thread per consumer at any given point in time ?
CodePudding user response:
Try this:
@RabbitListener(queues = ["queue-name"], exclusive = true)
If you are using a custom factory:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
See: https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer for detail.
And also, next time you have question about springamqp, you can add the spring-amqp
tag(https://stackoverflow.com/questions/tagged/spring-amqp) to your question.