Home > Mobile >  How to enable ElasticSearchIO parallel reads in Apache Beam?
How to enable ElasticSearchIO parallel reads in Apache Beam?

Time:11-16

I have a simple pipeline working in both Google Dataflow and the DirectRunner that is able to communicate with my ElasticSearch cluster and process some data. The basic process is this:

Read document from ElasticSearch using the ElasticIO connector.

Serialize it into our internal Protobuf

Perform a transform with no external calls or references on the document that outputs a different Protobuf.

Write the resulting Protobuf to another ElasticSearch index.

This works fine for my test cases, but when scaling up to run on hundreds of millions of documents I don't see any increase in parallelism. I've left jobs running for 5 hours in Google Dataflow and seen rates of 50 documents a second, which is just far too slow to be useful. Our current internally developed system can handle 10k - 20k / sec on a single instance. We have other applications that read very quickly from ElasticSearch, and I have no reason to believe that the ElasticSearch cluster is the limiting factor, yet.

What should I be looking at to make sure that the ElasticSearch read scales up to process in parallel?

I've tried:

Increasing numWorkers. This temporarily increased the number of workers, but it scales down because the ElasticSearch read was not producing enough data to keep the workers busy.

Changed the batch size in the ElasticSearch read. Had no impact.

Edit: Pipeline setup code:

PCollection<String> dataCollection = pipeline.apply("Reading From ElasticSearch", ElasticsearchIO.read()
        .withConnectionConfiguration(esReadConnection)
        .withBatchSize(options.getBatchSize())
        .withScrollKeepalive(scrollTime)
        .withQuery(options.getQuery())//query needs to be stringified json including with the "query" element
        .withMetadata()
    );
dataCollection.apply("Serialize", ParDo.of(new JsonToProto<>(searchHitTag, failTag, SearchHit::newBuilder));

Edit 2: After reading the split code link provided by @robertwb I think I have identified a problem. The index name I am configuring in my config is an alias (and in other uses a datastream). I have a feeling that this section of the getEstimatedSizeBytes code is not properly accounting for a large number of cases:

JsonNode indexStats =
          statsJson.path("indices").path(connectionConfiguration.getIndex()).path("primaries");
long indexSize = indexStats.path("store").path("size_in_bytes").asLong();

The stats query on my cluster is going to return a list of indexes under the indices key. None of which will have the exact name provided in the configuration.

A cleaner solution may be to use the _all object, instead of the individual indicies. The path for the above indexStats would be _all.primaries. The indexSize path would remain the same. An alternative would be to iterate through all of the indicies in the indices object and total up the indexSize for each returned key.

In either case, it is a mistake to use the connectionConfiguration.getIndex() on the path to get the index stats, because for a number of reasonable use cases in ElasticSearch, the index name or pattern you use to query the cluster may not be the exact name of the index returned. This issue would also prevent high performance when working with Datastreams in ElasticSearch, as they will all have a different index name than the datastream name used to query the cluster.

JIRA ticket for the fixes to Apache Beam's ElasticSearchIO connector: https://github.com/apache/beam/issues/24117

Edit: I've provided a patch attached to the issue in the Beam JIRA that will resolve this: link

CodePudding user response:

There is an issue currently in the ElasticsearchIO connector for Beam that does not handle index patterns or datastreams when getting the index stats needed to complete the getEstimatedSize methods. This causes Beam to not parallelize queries against multiple indexes. Details in the Beam JIRA ticket and I've provided a patch to resolve the issue in the comments on the ticket.

https://github.com/apache/beam/issues/24117

  • Related