Home > Enterprise >  Configure Flink use S3 checkpoint with WebIdentityTokenCredentialsProvider
Configure Flink use S3 checkpoint with WebIdentityTokenCredentialsProvider

Time:10-29

I am running my flink cluster in EKS, and I need to use the WebIdentityTokenCredentialsProvider for authentication. According to the doc, there are 2 approaches:

Presto

I tried setting up this in flink conf:

presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider

but it shows error:

Caused by: java.lang.NoSuchMethodException: com.amazonaws.auth.WebIdentityTokenCredentialsProvider.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
    at java.lang.Class.getConstructor0(Class.java:3082) ~[?:1.8.0_292]
    at java.lang.Class.getConstructor(Class.java:1825) ~[?:1.8.0_292]
    at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:845) ~[?:?]
    at com.facebook.presto.hive.s3.PrestoS3FileSystem.createAwsCredentialsProvider(PrestoS3FileSystem.java:833) ~[?:?]
    at com.facebook.presto.hive.s3.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:244) ~[?:?]
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) ~[?:?]
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:321) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:240) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:452) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:315) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:335) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:191) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:134) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:346) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_292]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]

As a result, I am using the fall back hadoop solution:

Hadoop

However, with Hadoop, I am able to successfully authenticate, but it always write 0 length data to checkpoint, I don't really find an error. There's only a WARNING in the log

2022-10-05 17:24:34,285 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 1 of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
 2022-10-05 17:24:34,287 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6. (0 consecutive failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) [flink-dist_2.12-1.14.5.jar:1.14.5]
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_292]
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_292]
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

I've also follow some suggestion found on google by setting parallelism to 1 but that is not help at all.

Ideally It would be nice to use presto, but for now, I am okay with either approach that would help on the checkpoint issue. Thanks!

CodePudding user response:

There are actually separate issues:

For s3a

The reason why it is not writing the checkpoint is because I am using Apache Beam portable runner, and the --checkpointing_interval is default to -1 , which means checkpointing is disabled. After setting this to a non zero value, the issue is fixed.

For s3p

Thanks vignesh kumar kathiresan on flink slack channel!

The solution is to configure

presto.s3.use-instance-credentials: "false"

and use the DefaultAWSCredentialsProviderChain (the default settings), The DefaultAWSCredentialsProviderChain will try multiple potential providers and WebIdentityTokenCredentialsProvider is one of it and will be picked up.

This works for me given that I have all my AWS configured correctly. But for people who starts from fresh, Besides the s3 permission, you would also need kms:GenerateDataKey, kms:Decryptas well.

  • Related