I am new to reactive programming and I´m trying to switch to ReactiveElasticsearchClient
.
Now the old code uses RestHighLevelClient
and does a scroll search with a do-while loop like the following:
SearchRequest searchRequest = searchRequest(indexName)
.source(searchSource()
.query(QueryBuilders.matchAllQuery())
.fetchSource(true)
.sort("id", SortOrder.ASC)
.size(1000))
.scroll(TimeValue.timeValueSeconds(60L));
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits searchHits = searchResponse.getHits();
String scrollId = searchResponse.getScrollId();
do {
/*
* work with the search hits and
* and do some logic here
*
* */
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll(TimeValue.timeValueSeconds(60));
try {
searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
searchHits = searchResponse.getHits();
scrollId = searchResponse.getScrollId();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
} while (searchHits.getHits().length > 0);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
How can I do the same in a reactive pipeline with the ReactiveElasticsearchClient
? Some of the classes that the RestHighLevelClient
uses are not available for the reactive client.
CodePudding user response:
Use
Flux<SearchHit> ReactiveElasticsearchClient.scroll(SearchRequest request)
This method executes the given query using the scroll API under the hood. The ReactiveElasticsearchClient
maintains the scroll id internally and fetches the next batch of data from Elasticsearch as the caller of the method consumes the returned Flux<SearchHit>
until all data is fetched.
As for the classes used by RestHighLevelClient
and ReactiveElasticsearchClient
:
The RestHighLevelClient
and the classes it uses come from the Elasticsearch libraries (rest-highlevel-client and Elasticsearch core). Spring Data Elasticsearch use the RestHighLevelClient
to access Elasticsearch. When you are using the RestHighLevelClient
directly you are not using Spring Data Elasticsearch but plain Elasticsearch code.
The ReactiveElasticsearchClient
is provided by Spring Data Elasticsearch and not by Elasticsearch. It uses classes for input and output from the Elasticsearch libraries as well, but also classes from Spring Data Elasticsearch or the Spring Framework. It's purpose is to enable the reactive access to Elasticsearch with Spring Data Elasticsearch. It's
functions are not a reactive copy of the RestHighLevelClient
but are what is needed to implement the reactive access. So you will not find corresponding function in the ReactiveElasticsearchClient
for every function of the RestHighLevelClient
.