Home > Enterprise >  How to close Rabbitmq Connection before Spring context
How to close Rabbitmq Connection before Spring context

Time:07-13

Spring - Close RabbitMQ Consumer connection without closing spring application

We have a requirement we want to close the rabbitmq connection, wait for few minutes before cleaning up other resources and then closing the spring application. We have a buffer of 6 minutes to clean up the other resources. However, during this time, the rabbitmq connection is auto recreated.

We are creating the connections as follows...

@Configuration
public class RabbitMQSubscribersHolder extends BaseRabbitMQConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSubscribersHolder.class);
    private ConcurrentMap<String, SimpleMessageListenerContainer> containers = new ConcurrentHashMap();

    public RabbitMQSubscribersHolder() {
    }

    public void addSubscriber(String mqAlias, MessageListener receiver) {
        ConnectionFactory connectionFactory = this.connectionFactory(mqAlias);
        RabbitAdmin admin = null;

        try {
            admin = new RabbitAdmin(connectionFactory);
            AbstractExchange exchange = this.exchange(mqAlias);
            admin.declareExchange(exchange);
            Queue queue = null;
            if (!StringUtils.isEmpty(this.getMqQueueNameForQueueAlias(mqAlias))) {
                queue = this.queue(mqAlias);
                admin.declareQueue(queue);
            } else {
                queue = admin.declareQueue();
            }

            admin.declareBinding(this.binding(mqAlias, queue, exchange));
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(new String[]{queue.getName()});
            container.setMessageListener(this.listenerAdapter(receiver));
            container.setRabbitAdmin(admin);
            container.start();
            this.containers.put(mqAlias, container);
        } catch (Exception var8) {
            LOGGER.error(var8.getMessage(), var8);
        }

    }

    MessageListenerAdapter listenerAdapter(MessageListener receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }

    public SimpleMessageListenerContainer getContainer(String mqAlias) {
        return (SimpleMessageListenerContainer)this.containers.get(mqAlias);
    }

This is our destroy method

public void destroy() {
        Iterator var1 = this.aliasToConnectionFactory.entrySet().iterator();

        while(var1.hasNext()) {
            Map.Entry<String, CachingConnectionFactory> entry = (Map.Entry)var1.next();

            try {
                ((CachingConnectionFactory)entry.getValue()).destroy();
                LOGGER.info("RabbitMQ caching connection factory closed for alias: {}", entry.getKey());
            } catch (Exception var4) {
                LOGGER.error("RabbitMQ caching connection destroy operation failed for alias: {} due to {}", entry.getKey(), StringUtils.getStackTrace(var4));
            }
        }

    }

It seems the connection is re-created automatically as soon as it is destroyed (verified from Rabbit UI). Here are the logs:

2022-07-11 19:42:55.334  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: preMatchFreeze
2022-07-11 19:42:55.395  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-tSAgXuPdMnLd9q79ryBWGg=fury_pre_matchfreeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@205b73d8 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:55.405  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: parentMatchFreezeEds
2022-07-11 19:42:55.447  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: fantasy_matchFreeze
2022-07-11 19:42:55.489  INFO 35761 --- [      Thread-11] c.g.f.f.r.GracefulShutdownRabbitMQ       : RabbitMQ caching connection factory closed for alias: parentMatchFreezeCore
2022-07-11 19:42:55.489  INFO 35761 --- [      Thread-11] c.g.f.fury.GracefulShutdownHandler       : Waiting before starting graceful shutdown
2022-07-11 19:42:55.794  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-sK8_FY4PtHW52HXkQV3S_w=fury_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@4bc9389 Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:42:56.046  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@3828e912 [delegate=amqp://[email protected]:5672/, localPort= 58170]
2022-07-11 19:42:56.098  INFO 35761 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{amq.ctag-KyvnWaqZco9NRTessjupJQ=fantasy_parent_match_freeze_queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@17d45cfb Shared Rabbit Connection: null, acknowledgeMode=AUTO local queue size=0
2022-07-11 19:43:13.127  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@4c3cefad [delegate=amqp://[email protected]:5672/, localPort= 58182]
2022-07-11 19:43:16.849  INFO 35761 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@801eaf8 [delegate=amqp://[email protected]:5672/, localPort= 58185]

CodePudding user response:

You need to stop the message listener container(s) to prevent them from trying to reconnect.

  • Related