Using non-blocking calls I want to generically take a Mono and call a method that returns a Flux and for each item in the Flux, call a method that returns Mono to return a Flux which is a an aggregate object of Bar Foo Bar and has as many elements as the Flux method returns (will return).
As a concrete example: Methods:
Flux<Bar> getBarsByFoo(Foo foo);
Mono<More> getMoreByBar(Bar bar);
Combined getCombinedFrom(Bar bar, Foo foo, More more);
Working code section:
Flux<Combined> getCombinedByFoo(Foo foo) {
getBarsByFoo(foo)...
}
From a blocking perspective what I want to accomplish is:
List<Combined> getCombinedByFoo(Foo foo) {
List<Bar> bars = getBarsByFoo(foo):
List<Combined> combinedList = new ArrayList<>(bars.size());
for (Bar bar: bars) {
More more = getMoreByBar(bar);
combinedList.append(getCombinedFrom(bar, foo, more));
}
return combinedList;
}
Any help on which Flux and Mono methods to use would be appreciated. I am still learning to change my brain into non-blocking thinking. Conceptually, I think there is a function to apply to each element (Bar) in from getBarsByFoo(Foo foo) to somehow map that to the combined element...
CodePudding user response:
Something like this:
Flux<Combined> getCombinedByFoo(Foo foo) {
return getBarsByFoo(foo)
.flatMap(bar -> getMoreByBar(bar)
.flatMap(more -> getCombinedFrom(bar, foo, more)))
}
I dont have an idea to check, i wrote this by free hand but something like this i guess.
CodePudding user response:
I like to think about Reactor programming as a flow of operations (as in flow programming), as a chain/DAG of operation.
In your case, you want to:
- map each emitted Bar object to a Combined object.
- Along the way, you need to use/call another publisher to fetch additional information:
- you need to wait for it to complete so you can fetch its output value. In the case of Monads/streams, there's a flatMap operation for it.
- flatMap waits for (or you can say that it extracts) a different publisher value to integrate it in the current chain of operations. I think it is called flatMap because in a sense, we break a level of hierarchy to flatten two nested publishers/monads in a single merged one.
The following example show a reactive version of your method (for a less verbose version, see Toerktumlare answer:
Flux<Combined> combine(Foo foo) {
Flux<Bar> bars = getBarBy(foo);
Flux<Combined> result = bars.flatMap(bar -> {
Mono<More> nextMore = getMoreBy(bar);
Mono<Combined> next = nextMore.map(more -> getCombinedFrom(foo, bar, more));
return next;
);
return result;
}
If you get your foo object through a Mono, you can just call flatMapMany
on it:
Mono<Foo> nextFoo = ...;
Flux<Combined> = nextFoo.flatMapMany(foo -> combine(foo));
WARNING
flatMap is very powerful: it can trigger concurrent execution of the provided operation. In your case, it means that many getMoreBy(bar)
operations can be launched at the same time. But it is a double-edged sword, because then it means that:
- ordering of elements is not preserved (or at least, there's no guarantee)
- In resource constrained system, having multiple operations launched at the same time could hurt performance or cause harm to the system (like, too many files open, etc.)
The concurrency behavior is quite high by default (256) and can be controlled in different ways:
- flatMap accepts an optional concurrency argument, to adapt the number of tasks allowed to run at the same time.
- There are other operators that flatten publishers, but manage work differently, like concatMap: it enforces sequential execution (and therefore, preserve ordering) of mapping tasks.