Home > database >  Flux .then() running before complete signal
Flux .then() running before complete signal

Time:09-21

I tried to do something with Flux streaming objects and after handling all elements do some last work and finish a Mono but it doesn't work:

 // data and id comming from a webrequest
 // myRepository is a org.springframework.data.r2dbc.repository.R2dbcRepository
 myRepository.findById(id)
    .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
    .then (doOnFinish(data))
    .subscribe();

Mono<DbObject> doSomethingWithDbObjectAndSave (DbObject dbo, DataObject data){
...
}
Mono<Void> doOnFinish(DataObject data){
...
}

The problem: Even I try this, the function "doOnFinish" is called before the first element pass doSomethingWithDbObjectAndSave" but I change something on the data object and would like to do this before!

The I tried to change the code:

myRepository.findById(id)
        .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
        .last()
        .flatMap(dbObject  -> doOnFinish(data))
        .subscribe(); 

I hoped, that I could use the last element to trigegr the onFinish function but I got "flux#last() didn't observe any onnext signal" and do not undetstand this!

Anyone any idea?

CodePudding user response:

then(methodCall(data)) will eagerly evaluate the parameter expression and thus call methodCall even before then is entered. You need a method which lazily evaluates its parameters.

I think you are looking for doOnComplete:

public final Flux<T> doOnComplete(Runnable onComplete)

Add behavior (side-effect) triggered when the Flux completes successfully.

myRepository.findById(id)
    .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
    .doOnComplete(() -> doOnFinish(data))
    .subscribe();

CodePudding user response:

Wow ... hours later I found a solution. doOnComplete doesn't return the stream with the async call from doOnFinish (Mono). Now I found this:

myRepository.findById(id)
   .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
   .takeLast(1)
   .flatMap(dbObject  -> doOnFinish(data))
   .subscribe();

That works for me ....

  • Related