When I run the below stream it does not receive any subsequent data once the stream runs.
final long HOUR = 3600000;
final long PAST_HOUR = System.currentTimeMillis()-HOUR;
private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
protected static ElasticsearchParams constructElasticsearchParams(
String indexName, String typeName, ApiVersion apiVersion) {
if (apiVersion == ApiVersion.V5) {
return ElasticsearchParams.V5(indexName, typeName);
} else if (apiVersion == ApiVersion.V7) {
return ElasticsearchParams.V7(indexName);
}
else {
throw new IllegalArgumentException("API version " apiVersion " is not supported");
}
}
String queryStr = "{ \"bool\": { \"must\" : [{\"range\" : {"
"\"timestamp\" : { "
"\"gte\" : " PAST_HOUR
" }} }]}} ";
ElasticsearchConnectionSettings connectionSettings =
ElasticsearchConnectionSettings.create("****")
.withCredentials("****", "****");
ElasticsearchSourceSettings sourceSettings =
ElasticsearchSourceSettings.create(connectionSettings)
.withApiVersion(ApiVersion.V7);
Source<ReadResult<Stats>, NotUsed> dataSource =
ElasticsearchSource.typed(
constructElasticsearchParams("data", "_doc", ApiVersion.V7),
queryStr,
sourceSettings,
Stats.class);
dataSource.buffer(10000, OverflowStrategy.backpressure());
dataSource.backpressureTimeout(Duration.ofSeconds(1));
dataSource
.log("error")
.runWith(Sink.foreach(a -> System.out.println(a)), actorSystem);
produces output :
ReadResult(id=1656107389556,source=Stats(size=0.09471),version=)
Data is continually being written to the index data
but the stream does not process it once it has started. Shouldn't the stream continually process data from the upstream source? In this case, the upstream source is an Elastic index named data.
I've tried amending the query to match all documents :
String queryStr = "{\"match_all\": {}}";
but the same result.
CodePudding user response:
The Elasticsearch source does not run continuously. It initiates a search, manages pagination (using the bulk API) and streams results; when Elasticsearch reports no more results it completes.
You could do something like
Source.repeat(Done).flatMapConcat(done -> ElasticsearchSource.typed(...))
Which will run a new search immediately after the previous one finishes. Note that it would be the responsibility of the downstream to filter out duplicates.