I am running my flink cluster in EKS, and I need to use the WebIdentityTokenCredentialsProvider for authentication. According to the doc, there are 2 approaches:
Presto
I tried setting up this in flink conf:
presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
but it shows error:
Caused by: java.lang.NoSuchMethodException: com.amazonaws.auth.WebIdentityTokenCredentialsProvider.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
at java.lang.Class.getConstructor0(Class.java:3082) ~[?:1.8.0_292]
at java.lang.Class.getConstructor(Class.java:1825) ~[?:1.8.0_292]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:845) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.createAwsCredentialsProvider(PrestoS3FileSystem.java:833) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:244) ~[?:?]
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:321) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:240) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:452) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:315) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:335) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:191) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:134) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:346) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
As a result, I am using the fall back hadoop solution:
Hadoop
However, with Hadoop, I am able to successfully authenticate, but it always write 0 length data to checkpoint, I don't really find an error. There's only a WARNING in the log
2022-10-05 17:24:34,285 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 1 of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
2022-10-05 17:24:34,287 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) [flink-dist_2.12-1.14.5.jar:1.14.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
I've also follow some suggestion found on google by setting parallelism to 1 but that is not help at all.
Ideally It would be nice to use presto, but for now, I am okay with either approach that would help on the checkpoint issue. Thanks!
CodePudding user response:
There are actually separate issues:
For s3a
The reason why it is not writing the checkpoint is because I am using Apache Beam portable runner, and the --checkpointing_interval
is default to -1 , which means checkpointing is disabled. After setting this to a non zero value, the issue is fixed.
For s3p
Thanks vignesh kumar kathiresan on flink slack channel!
The solution is to configure
presto.s3.use-instance-credentials: "false"
and use the DefaultAWSCredentialsProviderChain
(the default settings), The DefaultAWSCredentialsProviderChain
will try multiple potential providers and WebIdentityTokenCredentialsProvider
is one of it and will be picked up.
This works for me given that I have all my AWS configured correctly. But for people who starts from fresh, Besides the s3 permission, you would also need kms:GenerateDataKey
, kms:Decrypt
as well.