Home > OS >  ApacheBeam ElasticsearchIO is not working with latest elasticsearch
ApacheBeam ElasticsearchIO is not working with latest elasticsearch

Time:04-26

I've been trying to use ElasticsearchIO API's in apache beam pipeline. And I'm unable to connect to elasticsearch. Any help would be great.

My JAR versions:

org.apache.beam:beam-sdks-java-core:2.37.0

org.apache.beam:beam-sdks-java-io-elasticsearch:2.37.0

My Elastic version is: 8.1.2

The documentation says, only till v2.x is supported

Link: https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html

Can I not work with latest version of elastic? Has anyone tried and made it working with latest version?

My Pipeline:

public class Test {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();

        org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(options);

        PCollection<FileIO.ReadableFile> files = pipeline.apply(
                        FileIO.match().filepattern("/path_to_files/**")
                                .continuously(
                                        Duration.standardSeconds(10),
                                        Watch.Growth.never()))
                .apply(FileIO.readMatches());

        PCollection<FileIO.ReadableFile> filteredFiles =  files.apply(Filter.by(new EndsWithFilter("Dummy.txt")));
        
        filteredFiles.apply(ParDo.of(new DummyFileProcessor())).apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(new String[]{ "http://****:9200" }, "***").withUsername("***").withPassword("***")));

        pipeline.run().waitUntilFinish();
    }
}

Please find the Stacktrace, which says that the the elastic search version cannot be found

Exception in thread "main" org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2592)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:839)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2579)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed

    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more

> Task :IO-TextIO-TextIO_Read:Test.main() FAILED

Execution failed for task ':IO-TextIO-TextIO_Read:Test.main()'.

CodePudding user response:

I just found out that this below issue where ElasticsearchIO doesn't work with Elasticsearch 8.*

https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/367

Once I downgraded it to 7.13.2, it worked just fine

  • Related