I have a simple Spring Boot application where I have the following settings for RabbitMQ (spring-boot-starter-amqp version is 2.7.0):
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: my_host
username: admin
password: password
template:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
multiplier: 2
max-attempts: 3
A SimpleRabbitListenerContainerFactory
is configured with a MessagePostProcessor
:
@Bean
public SimpleRabbitListenerContainerFactory myInterceptContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
final ConnectionFactory connectionFactory,
final MyMessagePostProcessor myMessagePostProcessor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory() {
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
instance.setAfterReceivePostProcessors(myMessagePostProcessor);
super.initializeContainer(instance, endpoint);
}
};
((CachingConnectionFactory) connectionFactory).setRequestedHeartBeat(60);
configurer.configure(factory, connectionFactory);
return factory;
}
MessagePostProcessor
is very simple, if the version is wrong in the header, it throws an AmqpRejectAndDontRequeueException
:
@Component
public class MyMessagePostProcessor implements MessagePostProcessor {
private static final Logger LOGGER = LogManager.getLogger(MyMessagePostProcessor.class);
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (!"1.0".equalsIgnoreCase(message.getMessageProperties().getHeader("version"))) {
LOGGER.error("wrong version");
throw new AmqpRejectAndDontRequeueException("wrong version");
}
return message;
}
}
When I send a message with a wrong version, it works properly based on the logs:
MyApp: 2022-07-05 15:49:33,675 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.p.MyMessagePostProcessor:22 - wrong version
MyApp: 2022-07-05 15:49:33,676 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.AmqpRejectAndDontRequeueException: wrong version
at com.my.app.queue.postprocess.MyMessagePostProcessor.postProcessMessage(MyMessagePostProcessor.java:23) ~[classes/:1.0.0]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1544) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
My consumer is only doing one error log and immediately throws AmqpRejectAndDontRequeueException
to see if the message goes into the deadletter queue without retry:
@Component
public class MyMessageConsumer {
private static final Logger LOGGER = LogManager.getLogger(MyMessageConsumer.class);
@RabbitListener(queues = "my.queue", containerFactory = "myInterceptContainerFactory")
public void consumerMessage() {
LOGGER.error("error in consuming message");
throw new AmqpRejectAndDontRequeueException("over");
}
}
But based on the logs it is clearly visible that it retries 3 times (because my error log is printed 3 times) and only after then the message is delivered into deadletter queue:
MyApp: 2022-07-05 15:50:32,182 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:46,603 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,617 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,618 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@4060893b(byte[75])' MessageProperties [headers={version=1.0, requestor=MyApp}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=my.queue, deliveryTag=2, consumerTag=amq.ctag-5Xx6EwwjVokRV6wD9xL8Og, consumerQueue=my.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
... 27 more
MyApp: 2022-07-05 15:50:52,620 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:77) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException
... 19 more
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
... 14 more
But I want to deliver the message into the deadletter queue without retrying it 3 times.
I guess there is something with ListenerExecutionFailedException: Retry Policy Exhausted
, but I don't understand why my AmqpRejectAndDontRequeueException
is wrapped in this exception, because my MessagePostProcessor
doesn't throw ListenerExecutionFailedException
, only the one which I throw in code.
What am I missing here?
CodePudding user response:
As @Gary Russel suggested, I modified the retry policy as below:
@Bean
public RabbitRetryTemplateCustomizer customizeRetryPolicy(@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int maxAttempts) {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxAttempts, Map.of(AmqpRejectAndDontRequeueException.class, false), true, true);
return (target, retryTemplate) -> retryTemplate.setRetryPolicy(policy);
}
And only configuring this new bean, everything works as expected!
CodePudding user response:
The container knows nothing about the exception until retries are exhausted.
You would need to customize the retry policy to not retry for that exception.
This is currently not possible using application properties; you would need to configure the retry interceptor as a bean and inject it into the container factory.
If you need help with that, I can add an example.