I want to run flux in parallel, but collect the result in sequence. suppose I have flux of [3,2,1] after doing some task in parallel I expect the result to still be [3,2,1]
val mono = Flux.fromIterable(3 downTo 1)
.map { it.toString() }
// this will return the same number
.flatMap { number -> task(number) }
.doOnNext { println("Number of $it") }
// I got 1, 3, 2 which is good because I want it to run in parallel
.collectList()
.doOnNext { println(it) }
// I still got [1,3,2] but I want it to be [3,2,1] according to the iterable order.
.block()
CodePudding user response:
I think flatMapSequential
operator is what you are looking for.
Flux.fromIterable(3 downTo 1)
.map { it.toString() }
.flatMapSequential { number -> task(number) }
.doOnNext { println("Number of $it") }
.collectList()
.doOnNext { println(it) }
Here, flatMapSequential
is eagerly subscribing to its inner publishers (in parallel like flatMap) but merges them in the order of their source element, as desired.
Sample output:
3
1
2
[3, 2, 1]