I've been trying to use ElasticsearchIO API's in apache beam pipeline. And I'm unable to connect to elasticsearch. Any help would be great.
My JAR versions:
org.apache.beam:beam-sdks-java-core:2.37.0
org.apache.beam:beam-sdks-java-io-elasticsearch:2.37.0
My Elastic version is: 8.1.2
The documentation says, only till v2.x is supported
Can I not work with latest version of elastic? Has anyone tried and made it working with latest version?
My Pipeline:
public class Test {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(options);
PCollection<FileIO.ReadableFile> files = pipeline.apply(
FileIO.match().filepattern("/path_to_files/**")
.continuously(
Duration.standardSeconds(10),
Watch.Growth.never()))
.apply(FileIO.readMatches());
PCollection<FileIO.ReadableFile> filteredFiles = files.apply(Filter.by(new EndsWithFilter("Dummy.txt")));
filteredFiles.apply(ParDo.of(new DummyFileProcessor())).apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(new String[]{ "http://****:9200" }, "***").withUsername("***").withPassword("***")));
pipeline.run().waitUntilFinish();
}
}
Please find the Stacktrace, which says that the the elastic search version cannot be found
Exception in thread "main" org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 13 more
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2592)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:839)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2579)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 more
> Task :IO-TextIO-TextIO_Read:Test.main() FAILED
Execution failed for task ':IO-TextIO-TextIO_Read:Test.main()'.
CodePudding user response:
I just found out that this below issue where ElasticsearchIO doesn't work with Elasticsearch 8.*
https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/367
Once I downgraded it to 7.13.2, it worked just fine