I have the readEvents
function witch returns an event flux, in the below code i tried to merge all the events, so i used mergeWith
, but the allEventFlux
always returns empty
val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty<Event>()
for (id in ids) {
val events: Flux<Event> = eventStore.readEvents(id)
allEventFlux.mergeWith(events)
}
CodePudding user response:
Have you tried :
Flux.fromIterable(repository.findIds())
.map { it.ekycId }
.flatMap { id -> eventStore.readEvents(id) }
P.S : your current example cannot work, because "merge" does not merge in-place. It creates a new instance, that must be set to a variable, so as a minimum, you should modify you example as follow:
val eventFlux = ids.fold(Flux.empty<Event>()) { all, id -> all.mergeWith(eventStore.readEvents(id)) }