I tried to do something with Flux streaming objects and after handling all elements do some last work and finish a Mono but it doesn't work:
// data and id comming from a webrequest
// myRepository is a org.springframework.data.r2dbc.repository.R2dbcRepository
myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.then (doOnFinish(data))
.subscribe();
Mono<DbObject> doSomethingWithDbObjectAndSave (DbObject dbo, DataObject data){
...
}
Mono<Void> doOnFinish(DataObject data){
...
}
The problem: Even I try this, the function "doOnFinish" is called before the first element pass doSomethingWithDbObjectAndSave" but I change something on the data object and would like to do this before!
The I tried to change the code:
myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.last()
.flatMap(dbObject -> doOnFinish(data))
.subscribe();
I hoped, that I could use the last element to trigegr the onFinish function but I got "flux#last() didn't observe any onnext signal" and do not undetstand this!
Anyone any idea?
CodePudding user response:
then(methodCall(data))
will eagerly evaluate the parameter expression and thus call methodCall
even before then
is entered. You need a method which lazily evaluates its parameters.
I think you are looking for doOnComplete
:
public final Flux<T> doOnComplete(Runnable onComplete)
Add behavior (side-effect) triggered when the Flux completes successfully.
myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.doOnComplete(() -> doOnFinish(data))
.subscribe();
CodePudding user response:
Wow ... hours later I found a solution. doOnComplete doesn't return the stream with the async call from doOnFinish (Mono). Now I found this:
myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.takeLast(1)
.flatMap(dbObject -> doOnFinish(data))
.subscribe();
That works for me ....