We've got an application that will be using RabbitMQ. The design is to use a single exchange single queue with multiple routing keys for multiple teams and they will communicate through this single queue. I'm developing a java Application to just listen to that queue using a routingKey assigned to my team.
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue", durable = "true"),
exchange = @Exchange(value = "exchange", autoDelete = "false", type = "topic"),
key = "abc_rk"))
public void consumeMessagesFromRabbitMQ(Request request) throws InterruptedException {
System.out.println("Start:Request from RabbitMQ: " request);
Thread.sleep(10000L);
System.out.println("End:Request from RabbitMQ: " request);
}
Let's say the queue has 3 routingKey messages and out of them my application just want to listen to abc_rk
. But when I run this code, it's not filtering out the other messages but instead irrespective of what I've set in "key = ?" it pulls all the messages from the queue.
Note that I can't change the design and use the separate queue for each routingKey.
CodePudding user response:
RabbitMQ doesn't work that way (it has no concept of a message selector unlike JMS).
In fact, consumers know nothing about routing keys, only producers; the only reason you see it on @RabbitListener
is to aid configuration.
To do what you want, you need to bind 3 different queues to the exchange with the respective routing keys.
Note that I can't change the design and use the separate queue for each routingKey.
You could add a MessagePostProcessor
to the container (afterReceivePostProcessors
) to discard the unwanted messages by returning null. That is the only mechanism the framework provides for filtering messages.
/**
* Set {@link MessagePostProcessor}s that will be applied after message reception, before
* invoking the {@link MessageListener}. Often used to decompress data. Processors are invoked in order,
* depending on {@code PriorityOrder}, {@code Order} and finally unordered.
* @param afterReceivePostProcessors the post processor.
* @since 1.4.2
* @see #addAfterReceivePostProcessors(MessagePostProcessor...)
*/
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
But the best solution is 3 queues.