I'm trying to stream some data from kafka into s3 (using s3a protocol).
The pipeline works good for an hour, but after an hour (same as my token expiry setting for AWS), throws a (from the StreamingFileSink):
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken; Request ID: 7YFGVQ92YT51DP0K; S3 Extended Request ID: sx6UJJ548o0wpwJbkoWJ16jKRVih3ZV9XQdbThNhq5kUU7A7yCx58tcCGELVs5tqGWaMMPfZxZM=; Proxy: webproxy)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
...
I'm using AWSCredentialsProvider implementation which implements the getCredentials
and refreshes the token every 15 minutes with a newly parsed secret from aws.
My assumption is that the issue lies in how I initialise the StreamingFileSink
in the job itself:
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
StreamingFileSink<FELEvent> sink = StreamingFileSink
.forBulkFormat(<my Path Settings with basePath s3a://bucket/path/to/dir>)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withNewBucketAssigner(<My custom bucket assigner>)
.build();
env.fromSource(<Kafka source>)
.map(<Some operation>)
.filter(<Some filtergin>)
.addSink(sink)
.name("name").uid("uid");
env.execute("TAG");
Any ideas if the plugin refreshes tokens for an already initialised StreamingFileSink? If no, what's the best way to handle this scenario?
(Because of compatibility issues with zookeeper I'm using flink 14.3.)
Edit:
I checked the hadoop-fs plugin code and it seems like it initialises an S3 object with the provided (read) token only once in the initialisation of the FileSink. Looking for ways to re-initialise it somehow.
CodePudding user response:
Setting
fs.s3a.aws.credentials.provider:com.amazonaws.auth.profile.ProfileCredentialsProvider
in the job manager properties, alongside the environment variable AWS_PROFILES
to a valid AWS profile (such as /.aws/config
) fixes the issue.
Make sure that you're refreshing your tokens.
More on: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html