Home > Net >  Can Flux.doOnComplete() be called more than one time?
Can Flux.doOnComplete() be called more than one time?

Time:06-09

I have following code snippet with Spring Boot and WebFlux:

return  mongoTemplateReactive.find(query, Document.class, COLLECTIONNAME).
                doOnNext(value -> ctr.incrementAndGet()).
                doOnCancel(()->log.info("----Cancel Received----")).    
                doOnError(ex->{                     
                    log.info("----Error----"   ex); 
                    throw new IllegalStateException(ex);
                }).
                onBackpressureBuffer(100000, BufferOverflowStrategy.DROP_OLDEST).
                doOnComplete(()->{
                    log.info("----OnComplete----");
                });

Following is the output logs i receive and did not receive onCancel a single time:

----OnComplete----
----OnComplete----

This happens only when there is no records found from the MongoDB while executing the query. In case there are any valid records returned then everything is working fine.

Since doOncomplete() is called multiple times, so when it is called for the first time, it works good but after that any object in that block gets nullified and I get NullPointerException. As per my understanding doOncomplete() should be called only once in the end.

Request you to please help.

CodePudding user response:

The operators of the publisher (Flux or Mono), including the Mongo query itself, will execute each time the publisher is subscribed-to. Spring Boot Reactive subscribes once to the final result publisher once, in order to get the response to send the client; but you may well be using your publisher in several places, and each time it is used, it'll be subscribed-to anew.

You may want to examine how this publisher is used, and make sure you are not accidentally using it more than once. You might find that you can adjust your code so that it is not used more than once.

If there's no other choice, you might want to add a call to .cache() somewhere in your code, so that your Mongo query is only executed once. You do need to be sure that you do this caching in the right place so that you do not accidentally cache results for one user that are then seen by another user, for example, but that you do use the cache appropriately.

Finally, the doOnX() methods are really for monitoring-style operations. It's not good practice to do resource-management in them. It would be a good idea to see if there are alternative ways of doing whatever it is you are doing, in order to fit in with the reactive-streams paradigm.

  • Related