Please can someone help me to understand where is the probleme in this config: Versions :
- org.springframework.integration:spring-integration-mqtt:5.5.2
- org.springframework.boot:spring-boot-starter:2.5.3
- org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5
@Configuration
public class MqttConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://localhost:1883" });
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageDrivenChannelAdapter("MyApp", clientFactory, "ReplyTopic");
}
@Bean
IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter) {
return IntegrationFlows.from(inboundAdapter)
.bridge()
.channel("replyChannel")
.get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.publishSubscribe().get();;
}
@Bean
public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageHandler("MyApp", clientFactory);
}
@Bean
public IntegrationFlow outboundFlow(MqttPahoMessageHandler outboundAdapter) {
return IntegrationFlows.from("requestChannel")
.handle(outboundAdapter).get()
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
String send(String request, @Header(MqttHeaders.TOPIC) String requestTopic);
}
}
Client code
@RestController
public class MyController {
@Autowired
private MyGateway myGateway;
@GetMapping("/sendRequest")
public String sendRequest() {
var response = myGateway.send("Hello", "MyTopic");
return response;
}
}
Usage:
curl http://localhost:8080/sendRequest
manual response from the mqtt broker (HiveMQ)
docker exec -it hivemq mqtt pub -t ReplyTopic -m "World" --debug
CLIENT mqttClient-MQTT_5_0-9ecded84-8416-4baa-a8f3-d593c692bc65: acknowledged PUBLISH: 'World' for PUBLISH to Topic: ReplyTopic
But I dont know why i have this message on the Spring application output
2022-10-25 18:04:33.171 ERROR 17069 --- [T Call: MyApp] .m.i.MqttPahoMessageDrivenChannelAdapter : Unhandled exception for GenericMessage [payload=World, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=9dbd5e14-66ed-5dc8-6cea-6d04ef19c6cc, mqtt_receivedTopic=ReplyTopic, mqtt_receivedQos=0, timestamp=1666713873170}]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@6f63903c]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
Please can someone explain why i have this ?
no output-channel or replyChannel header available
CodePudding user response:
I think the problem you are facing is not related to your bridge()
configuration.
This comes from the MessagingGatewaySupport
and its replyMessageCorrelator
feature which is activated by your replyChannel = "replyChannel"
.
The real problem that you are trying to do what is not possible with MQTT v3. There is just no headers transferring over MQTT broker to carry on a required for gateway initiator a correlation key - the TemporaryReplyChannel
. See more in docs about gateway: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway.
In other words: independently of the replyChannel
configuration on gateway, the replyChannel
header must be present in the reply message. This is the way how gateway correlates requests with replies.
You have to look into an aggregator to send the request message in parallel and to preserve the mentioned TemporaryReplyChannel
header. Then when you receive a reply (inboundAdapter
) you send it to this aggregator. You need to ensure some correlation key from a request and reply payload, so they can match and fulfill group for reply to be sent back to the gateway.
See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator