I have a use case where we have a strict need that all of the queues have a dedicated thread and consumer, which means each thread will only serve one queue.
I have read this accepted answer and the comment by Gary Russell. He mentioned creating a child application context for having all the features of @RabbitListener
for achieving what I am trying. Still I didn't get how to do this when the queues are dynamically added during runtime.
If possible please point me to the relevant article for solving this in both the application context way(also how can I create the child contexts) and the MessageListenerAdapter
way as advised.
CodePudding user response:
Here is some solution based on Spring Boot and child application context declaration.
I have
in the component which will be declared in the child context:@EnableRabbit public class DynamicRabbitListener { @Bean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @RabbitListener(queuesToDeclare = @Queue(name = "${dynamic.queue}", autoDelete = "true")) public void listen(String payload, @Header(AmqpHeaders.CONSUMER_QUEUE) String queueName) { System.out.printf("Received %s from queue %s%n", payload, queueName); } }
We need
to register respective annotation processor to the child application context and be able to trigger the proper lifecycle.W need
in this context to be able to declare dynamic queues.Both of those aspects give us some isolation in the processing and logic.
Now this is how I declare those contexts:
@Bean ApplicationRunner applicationRunner(ConfigurableApplicationContext parent, RabbitTemplate rabbitTemplate) { return args -> { for (int i = 0; i < 10; i ) { String dynamicQueue = "dynamicQueue#" i; AnnotationConfigApplicationContext childApplicationContext = new AnnotationConfigApplicationContext(); childApplicationContext.setParent(parent); childApplicationContext.register(DynamicRabbitListener.class); ConfigurableEnvironment environment = parent.getEnvironment(); MapPropertySource propertySource = new MapPropertySource("dynamic.queues", Map.of("dynamic.queue", dynamicQueue)); environment.getPropertySources().addLast(propertySource); childApplicationContext.setEnvironment(environment); childApplicationContext.refresh(); rabbitTemplate.convertAndSend(dynamicQueue, "test data #" i); } }; }
Pay attention to the propertySource
to achieve a dynamic requirements for every child application context with its specific @RabbitListener
The output of my program is like this:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
:: Spring Boot :: (v2.7.0)
Received test data #0 from queue dynamicQueue#0
Received test data #1 from queue dynamicQueue#1
Received test data #2 from queue dynamicQueue#2
Received test data #3 from queue dynamicQueue#3
Received test data #4 from queue dynamicQueue#4
Received test data #5 from queue dynamicQueue#5
Received test data #6 from queue dynamicQueue#6
Received test data #7 from queue dynamicQueue#7
Received test data #8 from queue dynamicQueue#8
Received test data #9 from queue dynamicQueue#9