Home > Software design >  How do I create a Flux using generate wrapping calls that return a Mono
How do I create a Flux using generate wrapping calls that return a Mono

Time:12-08

I have an example where I want to use Flux.generate because I don't want to make the expensive blocking call unless / until a subscriber is asking for it. Specifically, I am calling Elasticsearch multiple times (effectively doing paging) until there are no more hits. I have implemented this using standard blocking calls in a Iterator<SearchResponse>. Where each call to the generate lambda blocks and the flux is then finished with .subscribeOn(Schedulers.boundedElastic()). However, I would like to use Spring's ReactiveElasticsearchClient that returns a Mono<SearchResponse> but still want to do it one at a time.

Here is the previous code using blocking:


  public Iterator<SearchResponse> createDeepQueryIterator(@NonNull PITSearchInput input){
    return new PointInTimeIterator(elasticClient, input);
  }

  public Flux<SearchResponse> createDeepQueryFlux(@NonNull PITSearchInput input){
    return Flux.<SearchResponse, PointInTimeIterator>generate(
            () -> new PointInTimeIterator(elasticClient, input),
            (deepQueryIterator, sink) -> {
              if (deepQueryIterator.hasNext()) {
                sink.next(deepQueryIterator.next());
              }else{
                sink.complete();
              }
              return deepQueryIterator;
            },
            (deepQueryIterator) -> deepQueryIterator.shutdown())
        .subscribeOn(Schedulers.boundedElastic());
  }

The above works well in that it waits to make the next call to ES until a (the) subscriber is ready for the next block of data.

In the below I am trying to use Spring's ReactiveElasticsearchClient but the issue is that multiple calls are make to ES before subscriber has processed the first.


  public Flux<SearchResponse> createDeepQuery(PointInTimeIteratorFactory.PITSearchInput input) {
    log.info("Creating flux");

    AtomicReference<PitId> pitId = new AtomicReference<>();
    AtomicInteger count = new AtomicInteger();

    Mono<PitId> pitIdMono =
        Mono.fromCallable(
            () -> {
              pitId.set(createPIT(input));
              return pitId.get();
            })
        .subscribeOn(Schedulers.boundedElastic());
    Mono<SearchResponse> searchResponseMono =
        pitIdMono.flatMap(
            p -> {
              log.info("Calling search");
              return reactiveElasticsearchClient.searchForResponse(createSearchRequestFrom(p, input));
            });
    Flux<SearchResponse> expand =
        searchResponseMono
            .expand(
                (searchResponse -> {
                  int hitCount = searchResponse.getHits().getHits().length;
                  count.addAndGet(hitCount);
                  log.info("Previous returned {} hits totaling {}", hitCount, count.get());

                  if (count.get() > input.getMaxTotalSize()
                  || hitCount < input.getMaxSizePerQuery()){
                    log.info("Returning empty");
                    return Mono.empty();
                  }

                  log.info("Calling search");
                  pitId.set(new PitId(searchResponse.pointInTimeId()));
                  return reactiveElasticsearchClient.searchForResponse(
                      createSearchRequestFrom(searchResponse, input));
                }))
            .doFinally(
                p -> {
                  deletePIT(pitId.get());
                });
    return expand;
  }

So the question is not to use the reactive client's ability to return a Mono<SearchResponse> in a Flux but only doing so one-at-a-time as needed by the subscriber.

The below is the logging from the Flux -> Mono method above with the PitTest logging coming from the test onNext() of the flux.

2021-12-02 13:13:37.300  INFO 13704 --- [           main] a.a.t.ReactivePointInTimeIteratorFactory : Creating flux
2021-12-02 13:13:37.346  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Creating PIT
2021-12-02 13:13:37.407  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.176  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 50
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877306267
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 100
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606162
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 150
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.272  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606362
2021-12-02 13:13:38.311  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 200
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877906244
2021-12-02 13:13:38.344  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 250
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Returning empty
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Closing PIT ReactivePointInTimeIteratorFactory.PitId(id=m_2xAwENYWN0aXZpdHlzdG9yZRZQQkRGWldmclI2cWZITEpoWDI1cGlRABZCZU8xbm55ZlFabXREYmNEdThESG1RAAAAAAAAWQcTFm5BcXdPU2xTUWE2bEU4dkVPVkpkWFEBFlBCREZaV2ZyUjZxZkhMSmhYMjVwaVEAAA==)
2021-12-02 13:13:40.171  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877306066]
2021-12-02 13:13:42.172  INFO 13704 --- [     parallel-2] p.actss.activity.store.PitTest           : [1634877306272]
2021-12-02 13:13:44.172  INFO 13704 --- [     parallel-3] p.actss.activity.store.PitTest           : [1634877606166]
2021-12-02 13:13:46.173  INFO 13704 --- [     parallel-4] p.actss.activity.store.PitTest           : [1634877906057]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877906248]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : Complete
2021-12-02 13:13:48.174  INFO 13704 --- [           main] p.actss.activity.store.PitTest           : blah
2021-12-02 13:13:48.175  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : onComplete

Update: Adding the PitTest code for completeness:


  @Test
  void testReactoiveFluxIt() throws InterruptedException {
    Flux<SearchResponse> deepQuery = reactivePointInTimeIteratorFactory.createDeepQuery(...);

    deepQuery
        .delayElements(Duration.ofMillis(2000))
        .doOnNext(p -> log.info(Arrays.toString(p.getHits().getHits()[0].getSortValues()))) //
        .doOnComplete(() -> log.info("Complete")) //
        .doFinally(p -> log.info(p.toString()))
        .blockLast();
    log.info("blah");
    Thread.sleep(5000);
  }

CodePudding user response:

delayElements switches to parallel scheduler and delays each emitted element by 2 seconds. This is why sort values are printed afterward.

  • Related