Home > database >  How To Consume multiple message per request using Spring AMQP?
How To Consume multiple message per request using Spring AMQP?

Time:08-03

Using RabbitMQ, I have a producer producing a large quantity (10s of thousands) of messages on a queue. I would like to make the consumer pull a block of 1000 messages per request from the queue. We are using the Spring AMQP implementation to get messages but trying to figure out how to achieve pulling more than one per request. The prefetch doesn't seem to be the right option and batch processing seems to need to be done on the producer side.

What are the options to make a consumer request to get a block of messages to be processed as a group?

Example of our setup

@RabbitListener(queues = "queue")
   void listen(String in) {
      log.info(in);
   }

CodePudding user response:

Batching on the consumer side has has been supported since version 2.2

/**
 * Set to true to present a list of messages based on the {@link #setBatchSize(int)},
 * if the listener supports it. This will coerce {@link #setDeBatchingEnabled(boolean)
 * deBatchingEnabled} to true as well.
 * @param consumerBatchEnabled true to create message batches in the container.
 * @since 2.2
 * @see #setBatchSize(int)
 */
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
    this.consumerBatchEnabled = consumerBatchEnabled;
}

It is only supported by the SimpleMessageListenerContainer.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#de-batching

Starting with version 2.2, the SimpleMessageListenerContainer can be use to create batches on the consumer side (where the producer sent discrete messages).

Set the container property consumerBatchEnabled to enable this feature.

Use List<String> in in the method signature.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#receiving-batch


@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

When using Spring Boot auto configuration for the container factory, set spring.rabbitmq.listener.simple.consumer-batch-enabled:true (and batch-size).

You also have to set batchListener=true; Boot does not provide that as a property, you can add something like this to set it.

@Component
class FactoryCustomizer {

    FactoryCustomizer(SimpleRabbitListenerContainerFactory factory) {
        factory.setBatchListener(true);
    }

}

CodePudding user response:

Here, my suggestion is that you can produce the message into multiple queues(for eg: 5 queues), then enable consumer for 5 queues. Divide and produce the data to 5 queues so that the consumer will be running 5 consumer threads(for each queue) it will consume the speed 5X faster.

  • Related