Home > database >  Reactive Stream: merge flux data into for loop
Reactive Stream: merge flux data into for loop

Time:01-20

I have the readEvents function witch returns an event flux, in the below code i tried to merge all the events, so i used mergeWith, but the allEventFlux always returns empty

    val ids = repository.findIds().map { it.ekycId }
    val allEventFlux = Flux.empty<Event>()
    for (id in ids) {
        val events: Flux<Event> = eventStore.readEvents(id)
        allEventFlux.mergeWith(events)
    }

CodePudding user response:

Have you tried :

Flux.fromIterable(repository.findIds())
    .map { it.ekycId }
    .flatMap { id -> eventStore.readEvents(id) }

P.S : your current example cannot work, because "merge" does not merge in-place. It creates a new instance, that must be set to a variable, so as a minimum, you should modify you example as follow:

val eventFlux = ids.fold(Flux.empty<Event>()) { all, id -> all.mergeWith(eventStore.readEvents(id)) }
  • Related