Home > OS >  How to use the Scroll API with the ReactiveElasticsearchClient
How to use the Scroll API with the ReactiveElasticsearchClient

Time:10-06

I am new to reactive programming and I´m trying to switch to ReactiveElasticsearchClient.

Now the old code uses RestHighLevelClient and does a scroll search with a do-while loop like the following:

SearchRequest searchRequest = searchRequest(indexName)
        .source(searchSource()
                        .query(QueryBuilders.matchAllQuery())
                        .fetchSource(true)
                        .sort("id", SortOrder.ASC)
                        .size(1000))
        .scroll(TimeValue.timeValueSeconds(60L));

SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits searchHits = searchResponse.getHits();
String scrollId = searchResponse.getScrollId();

do {
    /*
    * work with the search hits and
    * and do some logic here
    *
    * */

    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll(TimeValue.timeValueSeconds(60));

    try {
        searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
        searchHits = searchResponse.getHits();
        scrollId = searchResponse.getScrollId();

    } catch (Exception e) {

        LOGGER.error(e.getMessage());
    }

} while (searchHits.getHits().length > 0);

ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);

How can I do the same in a reactive pipeline with the ReactiveElasticsearchClient? Some of the classes that the RestHighLevelClient uses are not available for the reactive client.

CodePudding user response:

Use

Flux<SearchHit> ReactiveElasticsearchClient.scroll(SearchRequest request)

This method executes the given query using the scroll API under the hood. The ReactiveElasticsearchClient maintains the scroll id internally and fetches the next batch of data from Elasticsearch as the caller of the method consumes the returned Flux<SearchHit> until all data is fetched.

As for the classes used by RestHighLevelClient and ReactiveElasticsearchClient:

The RestHighLevelClient and the classes it uses come from the Elasticsearch libraries (rest-highlevel-client and Elasticsearch core). Spring Data Elasticsearch use the RestHighLevelClient to access Elasticsearch. When you are using the RestHighLevelClient directly you are not using Spring Data Elasticsearch but plain Elasticsearch code.

The ReactiveElasticsearchClient is provided by Spring Data Elasticsearch and not by Elasticsearch. It uses classes for input and output from the Elasticsearch libraries as well, but also classes from Spring Data Elasticsearch or the Spring Framework. It's purpose is to enable the reactive access to Elasticsearch with Spring Data Elasticsearch. It's functions are not a reactive copy of the RestHighLevelClient but are what is needed to implement the reactive access. So you will not find corresponding function in the ReactiveElasticsearchClient for every function of the RestHighLevelClient.

  • Related