I am using ActiveMQ 5.15.6, and I would like to take 2 messages from a queue, process them, and commit the transactional session.
However, if an Exception
is thrown messages are mark as redelivered and returned to queue, and the application waits until the redelivery delay expires and after that it try to process them again.
I need to process other messages in then queue until delayed messages are ready again to deliver.
Code is only proof of concept :)
public class ActiveMqService {
private final ActiveMQConnectionFactory cf;
private final Connection connection;
private final Session session;
private final Queue queue;
private final MessageConsumer consumer;
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy redeliveryPolicy = cf.getRedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(5000L);
redeliveryPolicy.setRedeliveryDelay(5000L);
redeliveryPolicy.setMaximumRedeliveries(100);
ActiveMQPrefetchPolicy prefetchPolicy = cf.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(2);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
connection = cf.createConnection(); //credentials
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("test");
connection.start();
consumer = session.createConsumer(queue);
}
@PostConstruct
public void read() throws JMSException {
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < 2; i ) {
Message message = consumer.receive(1000L);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
Random random = new Random();
if (1 == random.nextInt(2)) {
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
log.info("Message {}", textMessage.getText());
}
session.commit();
} else {
throw new Exception("vyjimka");
}
} catch (Exception e) {
List mess = new ArrayList();
for (Object message : messages) {
TextMessage textMessage = (TextMessage) message;
mess.add(textMessage.getText());
}
log.info("rollbackuju message {}", mess);
session.rollback();
}
}
}
}
}
In log you can see, that after rollback application wait 5 seconds (set delay) to redelivred messages, but don't process another messages in queue.
2021-12-09 17:05:47.605 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 7
2021-12-09 17:05:47.606 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 3
2021-12-09 17:05:47.612 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:52.623 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [8, 5]
2021-12-09 17:05:57.634 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 8
2021-12-09 17:05:57.635 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 5
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 9
2021-12-09 17:05:57.642 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 2
2021-12-09 17:05:57.648 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : rollbackuju message [6, 4]
2021-12-09 17:06:02.655 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 6
2021-12-09 17:06:02.656 INFO 14500 --- [ main] c.example.testactivemq.ActiveMqService : Message 4
CodePudding user response:
You need to invoke setNonBlockingRedelivery(true)
on your javax.jms.ConnectionFactory
in ActiveMqService()
, e.g.:
public ActiveMqService() throws JMSException {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
cf.setNonBlockingRedelivery(true);
...
}
The comment for this method states:
/** * When true a MessageConsumer will not stop Message delivery before re-delivering Messages * from a rolled back transaction. This implies that message order will not be preserved and * also will result in the TransactedIndividualAck option to be enabled. */
Additionally, you'll probably want to set your prefetch size higher than the current value of 2
since the 2 messages waiting to be redelivered will prevent additional messages from being fetched from the broker.