Home > OS >  Async RabbitMQ communcation using Spring Integration
Async RabbitMQ communcation using Spring Integration

Time:11-07

I have two spring boot services that communicate using RabbitMQ. Service1 sends request for session creation to Service2. Service2 handles request and should return response. Service1 should handle the response.

Service1 method for requesting session:

public void startSession()
{
     ListenableFuture<SessionCreationResponseDTO> sessionCreationResponse = sessionGateway.requestNewSession();

     sessionCreationResponse.addCallback(response -> {
             //handle success
     }, ex -> {
            // handle exception
     });
}

On Service1 I have defined AsyncOutboundGateway, like:

@Bean
public IntegrationFlow requestSessionFlow(MessageChannel requestNewSessionChannel, 
                                          AsyncRabbitTemplate amqpTemplate,
                                          SessionProperties sessionProperties)
{
        return flow -> flow.channel(requestNewSessionChannel)
                .handle(Amqp.asyncOutboundGateway(amqpTemplate)
                        .exchangeName(sessionProperties.getRequestSession().getExchangeName())
                        .routingKey(sessionProperties.getRequestSession().getRoutingKey()));
    }

On Service2, I have flow for receiving these messages:

@Bean
public IntegrationFlow requestNewSessionFlow(ConnectionFactory connectionFactory, 
                                             SessionProperties sessionProperties,
                                             MessageConverter messageConverter, 
                                             RequestNewSessionHandler requestNewSessionHandler)
{
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, 
                                sessionProperties.requestSessionProperties().queueName())
                .handle(requestNewSessionHandler)
                .get();

Service2 handles there requests:

@ServiceActivator(async = "true")
public ListenableFuture<SessionCreationResponseDTO> handleRequestNewSession()
{
    SettableListenableFuture<SessionCreationResponseDTO> settableListenableFuture = new SettableListenableFuture<>();

       // Goes through asynchronous process of creating session and sets value in listenable future

    return settableListenableFuture;
}

Problem is that Service2 immediately returns ListenableFuture to Service1 as message payload, instead of waiting for result of future and sending back result.

If I understood documentation correctly Docs by setting async parameter in @ServiceActivator to true, successful result should be returned and in case of exception, error channel would be used.

Probably I misunderstood documentation, so that I need to unpack ListenableFuture in flow of Service2 before returning it as response, but I am not sure how to achieve that.

I tried something with publishSubscribeChannel but without much luck.

CodePudding user response:

Your problem is here:

.handle(requestNewSessionHandler)

Such a configuration doesn't see your @ServiceActivator(async = "true") and uses it as a regular blocking service-activator.

Let's see if this helps you:

.handle(requestNewSessionHandler, "handleRequestNewSession", e -> e.async(true))

It is better to think about it like: or only annotation configuration. or only programmatic, via Java DSL.

  • Related