I've seen the issue about accessing flux at the middle of an IntegrationFlow
and I wonder why I succeed writing logic inside the flux in the following way:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux -> a ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
I wonder first of all why I never get errors thrown to the console, but when debugging I see errors.
I also want to know how can this work and why do I need the a
variable (that always produces NullPointerException
, even though the flow can continue and work fine). When I omit the a
variable:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
I get an exception Bad return type in lambda expression: Disposable cannot be converted to Publisher<Message<?>>
- e.g. code cant compile code due to types problem.
CodePudding user response:
The logic you write in that e.reactive()
is not correct.
See documentation for that endpoint configurer option to understand its purpose:
/**
* Make the consumer endpoint as reactive independently of an input channel and
* apply the provided function into the {@link Flux#transform(Function)} operator.
* @param reactiveCustomizer the function to transform {@link Flux} for the input channel.
* @return the spec
* @since 5.5
*/
public S reactive(Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer) {
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-reactive
You cannot do those "active" operation like flatMap()
in this configurer. You fully eliminate the purpose of the flow at all. That flatMap
must be done in the handler downstream. Such a bridge
does nothing meaningful. It just turns the current flow state into reactive. When the handle()
with similar reactive()
configurer would do the same, but also would apply a purpose of this handler - its handle part.
The .subscribe()
is not correct to do there at all. Let the framework to deal with provided reactive stream on top of Spring Integration! That's why you mislead yourself with that flux -> a ->
. It does compile and let you run because it is not compile or configuration part. It is indeed a callback evaluated at runtime, when you already send a message.
The writeToSolr
may be used like this:
flowDefinition
.channel(c -> c.flux())
.handle(new ReactiveMessageHandlerAdapter((message) -> writeToSolr(message.getPayload())))
I think we will revise reactive()
of the endpoint to expose only those Flux
operators which are only for configuration. The rest is out of current endpoint configuration: it has to be done in the target handler method logic.
In addition I think we can introduce a handleReactive(ReactiveMessageHandler)
as a terminal operator of the IntegrationFlow
to simplify that ReactiveMessageHandlerAdapter
usage.