Home > front end >  @RabbitListener need to listen another queue that not created yet with another project in first time
@RabbitListener need to listen another queue that not created yet with another project in first time

Time:11-23

I need to listen queue from rabbit that implemented in another microservice. assume in first time, we switch to communicate with streaming instead of rest and another application not started yet and it's queue not present in rabbitMq yet. so @RabbitListener after see for example 'accountingResponseChannel.account' queue not present, try to create it, but that will be create by other microservice with specific option. as this post you can not modify exists queue, so this is not ok that I create a queue for skipping 404 error.

and finally my question is that how can mute @RabbitListener trying for create missed queue when queue not exists? because it face with error!

my related code if required:

@Configuration
@EnableRabbit
@EnableBinding(AccountProcessor.class)
public class SwitchStreamConfig {

    @Bean
    public Queue accQueue() {
        return new Queue(AccountProcessor.ACCOUNTING_CHANNEL, true);
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }
}

public interface AccountProcessor {
    String ACCOUNTING_CHANNEL = "accountingChannel";
    String ACCOUNTING_RESPONSE_CHANNEL = "accountingResponseChannel";

    @Output(ACCOUNTING_CHANNEL)
    MessageChannel output();

    @Input(ACCOUNTING_CHANNEL)
    SubscribableChannel input();
}

public void transferMoneyStream(TransactionRequest transactionRequest) {
        rabbitTemplate.convertAndSend(AccountProcessor.ACCOUNTING_CHANNEL   ".account", transactionRequest);
}

@RabbitListener(queues = AccountProcessor.ACCOUNTING_RESPONSE_CHANNEL ".account")
public void transferMoneyResponse(TransactionResponse transferOk) {      
     merchantRequestService.updateStatusAfterStream(transferOk.getTransactionId());
}

and yaml config:

spring:
 cloud:
    stream:
      bindings:
        accountingChannel:
          group: account
          binder: rabbitmq
          consumer:
            maxAttempts: 10
      rabbit:
        bindings:
          accountingChannel:
            producer:
              auto-bind-dlq: true
            consumer:
              auto-bind-dlq: true
              dlq-ttl: 5000
              dlq-dead-letter-exchange:
              republishToDlq: true
      binders:
        rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: master_rabbitmq
                port: 5672
                username: guest
                password: guest

and exception:

Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[accountingResponseChannel.account]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:722)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:607)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:594)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1357)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1184)
    at com.sun.proxy.$Proxy341.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700)
    ... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'accountingResponseChannel.account' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
    ... 14 more

version of libs -> spring boot: 2.3.12.RELEASE, and spring-cloud: Hoxton.SR12

thanks in advance.

CodePudding user response:

You can create Queue simply by

@Bean
public Queue accountingResponseChannelQueue() {
    return new Queue("accountingResponseChannel.account", true);
}

But you need to make sure other microservice component create binding with this queue with topic and exchange. Otherwise your listener will get no data in queue.

CodePudding user response:

Set auto startup to false on the @RabbitListener.

Use a RabbitAdmin (Boot auto configures one) to test for the queue being present - admin.getQueueProperties().

    /**
     * Returns 3 properties {@link #QUEUE_NAME}, {@link #QUEUE_MESSAGE_COUNT},
     * {@link #QUEUE_CONSUMER_COUNT}, or null if the queue doesn't exist.
     */
    public Properties getQueueProperties(final String queueName) {

When the queue is present, use the RabbitListenerEndpointRegistry bean to start the container. Give the @RabbitListener an id and then registry.getListenerContainer("listenerId").start().

But @mystery is correct; producers don't normally create queues; they send to an exchange with a routing key; they don't know about the destination queue.

  • Related