Home > front end >  Spring Integration MQTT parallelization
Spring Integration MQTT parallelization

Time:01-13

I wrote a Spring application using Integration flows that reads some MQTT messages and puts them in incomingMqttMessageChannel:

  @Bean
  public IntegrationFlow incomingMqttMessageFlow() {
    return IntegrationFlows.from(mqttPahoMessageDrivenChannelAdapter())
        .channel("incomingMqttMessageChannel").get();
  }

  public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
        mqttBroker, UUID.randomUUID().toString(), incomingMqttTopic);
    //...
  }

  //...

And then I use some Spring Integration annotations to process the messages in incomingMqttMessageChannel, e.g.:

  @Transformer(inputChannel = "incomingMqttMessageChannel", outputChannel = "entityChannel")
  public Entity transform(byte[] mqttMessage){
    //transform mqtt message to other Entity
  }

I performed some tests and I realized that with this code messages were processed one by one.

I want to process the MQTT messages I receive in parallel using a thread pool, not running several Spring applications.

According to this the MqttPahoMessageDrivenChannelAdapter is single-threaded.

Is there any way to parallelize message processing in this case? Which are the options I have?

Thanks in advance.

CodePudding user response:

Make your incomingMqttMessageChannel as an Executor channel:

.channel(c -> c.executor("incomingMqttMessageChannel", threadPoolTaskExecutor))

This way your MQTT messages are going to consumed from that channel from threads of that executor.

See more info in docs: https://docs.spring.io/spring-integration/reference/html/core.html#executor-channel

  • Related