I have an Apache Flink application that I have deployed on Kinesis Data analytics.
This application reads from Kafka and writes to S3. The S3 bucket structure it writes to is computed using a BucketAssigner.A stripped down version of the BucketAssigner here
The problem I have is, let us say we have to write to this directory structure: s3://myBucket/folder1/folder2/folder3/myFile.json
Before making the PUT
request, it makes a the following HEAD
requests:
HEAD /folder1
HEAD /folder1/folder2
HEAD /folder1/folder2/folder3/
And then it makes the PUT
request.
It is doing it for each and every request, which is causing S3 rate limiting and there by backpressure in my FLink application.
I found that someone had a similar issue with BucketingSink: https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz
The solution mentioned there was to switch to StreamingFileSink which is what I am doing .
Any ideas on how to fix this in StreamingFileSink?
My SinkConfig is as follows:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
The JsonEncoder takes the object and converts it to json and writes out bytes like this
I have described more details about how the whole pipeline works in this question if that helps in anyway: Heavy back pressure and huge checkpoint size
CodePudding user response:
The Hadoop S3 file system tries to imitate a filesystem on top of S3. This means that:
- before writing a key it checks if the "parent directory" exists by checking for a key with the prefix up to the last "/"
- it creates empty marker files to mark the existence of such a parent directory
- all these "existence" requests are S3 HEAD requests which are both expensive and start to violate consistent read-after-create visibility
As a result, the Hadoop S3 file system has very high "create file" latency and it hits request rate limits very quickly (HEAD requests have very low request rate limits on S3). As a consequence, it's best to find ways to write to fewer distinct files.
You might also explore using entropy injection. Entropy injection is happening at the file system level, so it should work with the FileSink. Except I'm not sure how it will interact with the partitioning/bucketing being done by the sink, so you may or may not find it useable in practice. If you try it, please report back!