Home > OS >  How to run flux in parallel, but collect result in sequence
How to run flux in parallel, but collect result in sequence

Time:12-01

I want to run flux in parallel, but collect the result in sequence. suppose I have flux of [3,2,1] after doing some task in parallel I expect the result to still be [3,2,1]

val mono = Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      // this will return the same number
      .flatMap { number -> task(number) }
      .doOnNext { println("Number of $it") }
      // I got 1, 3, 2 which is good because I want it to run in parallel
      .collectList()
      .doOnNext { println(it) }
      // I still got [1,3,2] but I want it to be [3,2,1] according to the iterable order.
      .block()

CodePudding user response:

I think flatMapSequential operator is what you are looking for.

Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      .flatMapSequential { number -> task(number) }
      .doOnNext { println("Number of $it") }
      .collectList()
      .doOnNext { println(it) }

Here, flatMapSequential is eagerly subscribing to its inner publishers (in parallel like flatMap) but merges them in the order of their source element, as desired.

Sample output:

3
1
2
[3, 2, 1]
  • Related