I need to listen queue from rabbit that implemented in another microservice.
assume in first time, we switch to communicate with streaming instead of rest and another application not started yet and it's queue not present in rabbitMq yet.
so @RabbitListener
after see for example 'accountingResponseChannel.account' queue not present, try to create it, but that will be create by other microservice with specific option. as this post you can not modify exists queue, so this is not ok that I create a queue for skipping 404 error.
and finally my question is that how can mute @RabbitListener
trying for create missed queue when queue not exists? because it face with error!
my related code if required:
@Configuration
@EnableRabbit
@EnableBinding(AccountProcessor.class)
public class SwitchStreamConfig {
@Bean
public Queue accQueue() {
return new Queue(AccountProcessor.ACCOUNTING_CHANNEL, true);
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
}
public interface AccountProcessor {
String ACCOUNTING_CHANNEL = "accountingChannel";
String ACCOUNTING_RESPONSE_CHANNEL = "accountingResponseChannel";
@Output(ACCOUNTING_CHANNEL)
MessageChannel output();
@Input(ACCOUNTING_CHANNEL)
SubscribableChannel input();
}
public void transferMoneyStream(TransactionRequest transactionRequest) {
rabbitTemplate.convertAndSend(AccountProcessor.ACCOUNTING_CHANNEL ".account", transactionRequest);
}
@RabbitListener(queues = AccountProcessor.ACCOUNTING_RESPONSE_CHANNEL ".account")
public void transferMoneyResponse(TransactionResponse transferOk) {
merchantRequestService.updateStatusAfterStream(transferOk.getTransactionId());
}
and yaml config:
spring:
cloud:
stream:
bindings:
accountingChannel:
group: account
binder: rabbitmq
consumer:
maxAttempts: 10
rabbit:
bindings:
accountingChannel:
producer:
auto-bind-dlq: true
consumer:
auto-bind-dlq: true
dlq-ttl: 5000
dlq-dead-letter-exchange:
republishToDlq: true
binders:
rabbitmq:
type: rabbit
environment:
spring:
rabbitmq:
host: master_rabbitmq
port: 5672
username: guest
password: guest
and exception:
Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[accountingResponseChannel.account]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:722)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:607)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:594)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1357)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1184)
at com.sun.proxy.$Proxy341.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700)
... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'accountingResponseChannel.account' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 14 more
version of libs -> spring boot: 2.3.12.RELEASE
, and spring-cloud: Hoxton.SR12
thanks in advance.
CodePudding user response:
You can create Queue simply by
@Bean
public Queue accountingResponseChannelQueue() {
return new Queue("accountingResponseChannel.account", true);
}
But you need to make sure other microservice component create binding with this queue with topic and exchange. Otherwise your listener will get no data in queue.
CodePudding user response:
Set auto startup to false
on the @RabbitListener
.
Use a RabbitAdmin
(Boot auto configures one) to test for the queue being present - admin.getQueueProperties()
.
/**
* Returns 3 properties {@link #QUEUE_NAME}, {@link #QUEUE_MESSAGE_COUNT},
* {@link #QUEUE_CONSUMER_COUNT}, or null if the queue doesn't exist.
*/
public Properties getQueueProperties(final String queueName) {
When the queue is present, use the RabbitListenerEndpointRegistry
bean to start the container. Give the @RabbitListener
an id
and then registry.getListenerContainer("listenerId").start()
.
But @mystery is correct; producers don't normally create queues; they send to an exchange with a routing key; they don't know about the destination queue.