Spring - Close RabbitMQ Consumer connection without closing spring application
We have a requirement we want to close the rabbitmq connection, wait for few minutes before cleaning up other resources and then closing the spring application. We have a buffer of 6 minutes to clean up the other resources. However, during this time, the rabbitmq connection is auto recreated.
We are creating the connections as follows...
@Configuration
public class RabbitMQSubscribersHolder extends BaseRabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSubscribersHolder.class);
private ConcurrentMap<String, SimpleMessageListenerContainer> containers = new ConcurrentHashMap();
public RabbitMQSubscribersHolder() {
}
public void addSubscriber(String mqAlias, MessageListener receiver) {
ConnectionFactory connectionFactory = this.connectionFactory(mqAlias);
RabbitAdmin admin = null;
try {
admin = new RabbitAdmin(connectionFactory);
AbstractExchange exchange = this.exchange(mqAlias);
admin.declareExchange(exchange);
Queue queue = null;
if (!StringUtils.isEmpty(this.getMqQueueNameForQueueAlias(mqAlias))) {
queue = this.queue(mqAlias);
admin.declareQueue(queue);
} else {
queue = admin.declareQueue();
}
admin.declareBinding(this.binding(mqAlias, queue, exchange));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(new String[]{queue.getName()});
container.setMessageListener(this.listenerAdapter(receiver));
container.setRabbitAdmin(admin);
container.start();
this.containers.put(mqAlias, container);
} catch (Exception var8) {
LOGGER.error(var8.getMessage(), var8);
}
}
MessageListenerAdapter listenerAdapter(MessageListener receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
public SimpleMessageListenerContainer getContainer(String mqAlias) {
return (SimpleMessageListenerContainer)this.containers.get(mqAlias);
}
This is our destroy method
public void destroy() {
Iterator var1 = this.aliasToConnectionFactory.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<String, CachingConnectionFactory> entry = (Map.Entry)var1.next();
try {
((CachingConnectionFactory)entry.getValue()).destroy();
LOGGER.info("RabbitMQ caching connection factory closed for alias: {}", entry.getKey());
} catch (Exception var4) {
LOGGER.error("RabbitMQ caching connection destroy operation failed for alias: {} due to {}", entry.getKey(), StringUtils.getStackTrace(var4));
}
}
}
It seems the connection is re-created automatically as soon as it is destroyed (verified from Rabbit UI). Here are the logs:
2022-07-11 19:42:55.334 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: preMatchFreeze
2022-07-11 19:42:55.395 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-tSAgXuPdMnLd9q79ryBWGg=fury_pre_matchfreeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@205b73d8 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:55.405 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: parentMatchFreezeEds
2022-07-11 19:42:55.447 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: fantasy_matchFreeze
2022-07-11 19:42:55.489 INFO 35761 --- [ Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ : RabbitMQ caching connection factory closed for alias: parentMatchFreezeCore
2022-07-11 19:42:55.489 INFO 35761 --- [ Thread-11] c.g.f.fury.GracefulShutdownHandler : Waiting before starting graceful shutdown
2022-07-11 19:42:55.794 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-sK8_FY4PtHW52HXkQV3S_w=fury_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@4bc9389 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:56.046 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3828e912 [delegate=amqp://[email protected]:5672/, localPort= 58170]
2022-07-11 19:42:56.098 INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-KyvnWaqZco9NRTessjupJQ=fantasy_parent_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@17d45cfb Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:43:13.127 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@4c3cefad [delegate=amqp://[email protected]:5672/, localPort= 58182]
2022-07-11 19:43:16.849 INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@801eaf8 [delegate=amqp://[email protected]:5672/, localPort= 58185]
CodePudding user response:
You need to stop the message listener container(s) to prevent them from trying to reconnect.