Home > Mobile >  How to see the types that flows in Spring Integration's IntegrationFlow
How to see the types that flows in Spring Integration's IntegrationFlow

Time:03-15

I try to understand what's the type that returns when I aggregate in Spring Integration and that's pretty hard. I'm using Project Reactor and my code snippet is:

public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
    FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
    f.setWindowTimespan(Duration.ofSeconds(5));
    f.setCombineFunction(messageFlux -> messageFlux
        .map(Message::getPayload)
        .collectList()
        .map(GenericMessage::new);
    return f;
}

@Bean
public IntegrationFlow dataPipeline() {
   return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
      .handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
      .handle(bla())
      .get();
}

More than understanding the types that passes in the example, I want to know in general how can I know what are the objects that flows in the IntegrationFlow and their types.

CodePudding user response:

IntegrationFlows.from(somePublisher)

This creates a FluxMessageChannel internally which subscribes to the provided Publsiher. Every single event is emitted from this channel to its subscriber - your aggregator.

The FluxAggregatorMessageHandler produces whatever is explained in the setCombineFunction() JavaDocs:

/**
 * Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
 * Requires a {@link Mono} result with a {@link Message} as value as a combination result
 * of the incoming {@link Flux} for window.
 * By default a {@link Flux} for window is fully wrapped into a message with headers copied
 * from the first message in window. Such a {@link Flux} in the payload has to be subscribed
 * and consumed downstream.
 * @param combineFunction the {@link Function} to use for result windows transformation.
 */
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {

So, it is a Mono with a message which you really do with your .collectList(). That Mono is subscribed by the framework when it emits a reply message from the FluxAggregatorMessageHandler. Therefore your .handle(bla()) must expect a list of payloads. Which is really natural for the aggregator result.

See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator

  • Related