I wrote a Spring application using Integration flows that reads some MQTT messages and puts them in incomingMqttMessageChannel
:
@Bean
public IntegrationFlow incomingMqttMessageFlow() {
return IntegrationFlows.from(mqttPahoMessageDrivenChannelAdapter())
.channel("incomingMqttMessageChannel").get();
}
public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttBroker, UUID.randomUUID().toString(), incomingMqttTopic);
//...
}
//...
And then I use some Spring Integration annotations to process the messages in incomingMqttMessageChannel
, e.g.:
@Transformer(inputChannel = "incomingMqttMessageChannel", outputChannel = "entityChannel")
public Entity transform(byte[] mqttMessage){
//transform mqtt message to other Entity
}
I performed some tests and I realized that with this code messages were processed one by one.
I want to process the MQTT messages I receive in parallel using a thread pool, not running several Spring applications.
According to this the MqttPahoMessageDrivenChannelAdapter is single-threaded.
Is there any way to parallelize message processing in this case? Which are the options I have?
Thanks in advance.
CodePudding user response:
Make your incomingMqttMessageChannel
as an Executor
channel:
.channel(c -> c.executor("incomingMqttMessageChannel", threadPoolTaskExecutor))
This way your MQTT messages are going to consumed from that channel from threads of that executor.
See more info in docs: https://docs.spring.io/spring-integration/reference/html/core.html#executor-channel