I have a list of mono i would like to process but they have to be executed sequentialy and also should execute next one only after previous mono is complete.
private Mono<List<Result>> processGoals(List<> goals,Data data) {
List<Mono<Result>> plans = goals
.stream()
.map(plan -> processGoal(plan, data))
.collect(Collectors.toList());
}
I tried to use
return Flux.concat(plans).subscribeOn(Schedulers.single()).collectList();
But this executes the next mono before previous one is complete.
CodePudding user response:
Instead of mixing two paradigms: Streams and Reactive Streams. You can try to go fully reactive.
Try following:
Mono<List<Result>> res = Flux.just(goals.toArray(Goal[]::new))
.flatMapSequential(goal -> processGoal(goal, data))
.subscribeOn(Schedulers.single())
.collectList();
CodePudding user response:
.flatMapSequential(goal -> processGoal(goal, data), 1)
Last parameter concurrency 1 is very important. I have tried this before without concurrency parameter and that didnt work. Kudos to @Michael McFadyen