Home > front end >  ActiveMQ consumer waiting for redelivered messages instead of processing other messages in the queue
ActiveMQ consumer waiting for redelivered messages instead of processing other messages in the queue

Time:12-10

I am using ActiveMQ 5.15.6, and I would like to take 2 messages from a queue, process them, and commit the transactional session.

However, if an Exception is thrown messages are mark as redelivered and returned to queue, and the application waits until the redelivery delay expires and after that it try to process them again.

I need to process other messages in then queue until delayed messages are ready again to deliver.

Code is only proof of concept :)

public class ActiveMqService {

    private final ActiveMQConnectionFactory cf;
    private final Connection connection;
    private final Session session;
    private final Queue queue;
    private final MessageConsumer consumer;

    public ActiveMqService() throws JMSException {
        cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        RedeliveryPolicy redeliveryPolicy = cf.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(5000L);
        redeliveryPolicy.setRedeliveryDelay(5000L);
        redeliveryPolicy.setMaximumRedeliveries(100);
        ActiveMQPrefetchPolicy prefetchPolicy = cf.getPrefetchPolicy();
        prefetchPolicy.setQueuePrefetch(2);
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
        connection = cf.createConnection(); //credentials
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        queue = session.createQueue("test");
        connection.start();
        consumer = session.createConsumer(queue);

    }

    @PostConstruct
    public void read() throws JMSException {

        while (true) {
            List messages = new ArrayList<Message>();
            for (int i = 0; i < 2; i  ) {
                Message message = consumer.receive(1000L);
                if (message != null) {
                    messages.add(message);
                } else {
                    break; // no more messages available for this batch
                }
            }

            if (messages.size() > 0) {
                try {
                    Random random = new Random();
                    if (1 == random.nextInt(2)) {
                        for (Object message : messages) {
                            TextMessage textMessage = (TextMessage) message;
                            log.info("Message {}", textMessage.getText());
                        }
                        session.commit();
                    } else {
                        throw new Exception("vyjimka");
                    }
                } catch (Exception e) {
                    List mess = new ArrayList();
                    for (Object message : messages) {
                        TextMessage textMessage = (TextMessage) message;
                        mess.add(textMessage.getText());
                    }
                    log.info("rollbackuju message {}", mess);
                    session.rollback();
                }
            }
        }
    }
}

In log you can see, that after rollback application wait 5 seconds (set delay) to redelivred messages, but don't process another messages in queue.

2021-12-09 17:05:47.605  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 7
2021-12-09 17:05:47.606  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 3
2021-12-09 17:05:47.612  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [8, 5]
2021-12-09 17:05:52.623  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [8, 5]
2021-12-09 17:05:57.634  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 8
2021-12-09 17:05:57.635  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 5
2021-12-09 17:05:57.642  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 9
2021-12-09 17:05:57.642  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 2
2021-12-09 17:05:57.648  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : rollbackuju message [6, 4]
2021-12-09 17:06:02.655  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 6
2021-12-09 17:06:02.656  INFO 14500 --- [           main] c.example.testactivemq.ActiveMqService   : Message 4

CodePudding user response:

You need to invoke setNonBlockingRedelivery(true) on your javax.jms.ConnectionFactory in ActiveMqService(), e.g.:

public ActiveMqService() throws JMSException {
    cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    cf.setNonBlockingRedelivery(true);
    ...
}

The comment for this method states:

 /**
  * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
  * from a rolled back transaction.  This implies that message order will not be preserved and
  * also will result in the TransactedIndividualAck option to be enabled.
  */

Additionally, you'll probably want to set your prefetch size higher than the current value of 2 since the 2 messages waiting to be redelivered will prevent additional messages from being fetched from the broker.

  • Related