I try to understand what's the type that returns when I aggregate in Spring Integration and that's pretty hard. I'm using Project Reactor and my code snippet is:
public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
f.setWindowTimespan(Duration.ofSeconds(5));
f.setCombineFunction(messageFlux -> messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new);
return f;
}
@Bean
public IntegrationFlow dataPipeline() {
return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
.handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
.handle(bla())
.get();
}
More than understanding the types that passes in the example, I want to know in general how can I know what are the objects that flows in the IntegrationFlow
and their types.
CodePudding user response:
IntegrationFlows.from(somePublisher)
This creates a FluxMessageChannel
internally which subscribes to the provided Publsiher
. Every single event is emitted from this channel to its subscriber - your aggregator.
The FluxAggregatorMessageHandler
produces whatever is explained in the setCombineFunction()
JavaDocs:
/**
* Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
* Requires a {@link Mono} result with a {@link Message} as value as a combination result
* of the incoming {@link Flux} for window.
* By default a {@link Flux} for window is fully wrapped into a message with headers copied
* from the first message in window. Such a {@link Flux} in the payload has to be subscribed
* and consumed downstream.
* @param combineFunction the {@link Function} to use for result windows transformation.
*/
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
So, it is a Mono
with a message which you really do with your .collectList()
. That Mono
is subscribed by the framework when it emits a reply message from the FluxAggregatorMessageHandler
. Therefore your .handle(bla())
must expect a list of payloads. Which is really natural for the aggregator result.
See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator