Home > Net >  Spring cloud stream RabbitMQ - bind DLQ with an exchange using one routing key
Spring cloud stream RabbitMQ - bind DLQ with an exchange using one routing key

Time:01-05

I am using Spring Cloud Stream version 3.0.6.RELEASE.

I have an existing exchange called my.queue.exchange. My application contains one consumer, I want to create a queue called MY_QUEUE and bind that queue to my.queue.exchange exchange. Additionally, I want to republish failed messages to a DLQ called MY_QUEUE_DLQ.

My problem is that the dead letter queue called MY_QUEUE_DLQ is bound to my.queue.exchange.dlx exchange with two routing keys instead of one, the first one with routing key my.queue.rkey.dlx and the second one with routing key MY_QUEUE

My consumer bean:

@Bean
public Consumer<Dto> consumeFunction() {

    return dto -> {
       // do stuff
    };
}

my application.yml:

spring:
  cloud:
    stream:
      function:
        definition: consumeFunction
      rabbit:
        bindings:
          consumeFunction-in-0:
            consumer:
              autoBindDlq: true
              deadLetterQueueName: MY_QUEUE_DLQ
              deadLetterExchange: my.queue.exchange.dlx
              deadLetterRoutingKey: my.queue.rkey.dlx
              deadLetterExchangeType: topic
              declareExchange: false
              bindQueue: true
              queueNameGroupOnly: true
              bindingRoutingKey: 'my.queue.rkey'
      bindings:
        consumeFunction-in-0:
          destination: my.queue.exchange
          group: MY_QUEUE

CodePudding user response:

You need to set republishToDlq: false on the rabbit consumer properties. In the RabbitExchangeQueueProvisioner you can see the following code (see the comment_

if (properties instanceof RabbitConsumerProperties
                    && ((RabbitConsumerProperties) properties).isRepublishToDlq()) {
    /*
     * Also bind with the base queue name when republishToDlq is used, which
     * does not know about partitioning
     */
    declareBinding(dlqName, new Binding(dlq.getName(), DestinationType.QUEUE,
                        dlxName, baseQueueName, arguments));
}

You can also get more details here as to why.

Also, I see you are using

spring:
  cloud:
    stream:
      function:
        definition: consumeFunction

Please change it to

spring:
  cloud:
    function:
      definition: consumeFunction

as the other property is deprecated and is already removed in 3.2 version.

  •  Tags:  
  • Related