Home > front end >  How can I create "two-layer" bucket directories using Flink FileSink?
How can I create "two-layer" bucket directories using Flink FileSink?

Time:09-27

I am using Flink FileSink to sink my data to s3 file storage.

I need my data structured into subdirectories like "s3://mybucket/dt=20210926/hour=13/".

Now I managed to write my data into customized buckets using DateTimeBucketAssigner as following:

FileSink<RowData> orcSink = FileSink
            .forBulkFormat(new Path("s3a://mybucket/flink_file_sink_orc_test"), factory)
            .withBucketAssigner(new DateTimeBucketAssigner<>("'dt='yyyyMMdd", ZoneId.of("Asia/Shanghai")))
            .build();

By doing this I was able to create buckets like "s3://mybucket/dt=20210926".

I am struggling to start buckets like "s3://mybucket/dt=yyyyMMdd/hour=HH". Need help! Thanks!

CodePudding user response:

If you need custom data partitioning, I think you should implement custom BucketAssigner similar to a built-in DateTimeBucketAssigner, which will partition based on your needs.

Here is the example of the bucket assigner for your case:

public class DateTimeHourBucketAssigner<IN> implements BucketAssigner<IN, String> {

    private static final long serialVersionUID = 1L;

    private static final String DEFAULT_DATE_FORMAT_STRING = "yyyyMMdd";
    private static final String DEFAULT_HOUR_FORMAT_STRING = "HH";

    private final String dateFormatString;
    private final String hourFormatString;

    private final ZoneId zoneId;

    private transient DateTimeFormatter dateTimeFormatter;
    private transient DateTimeFormatter hourFormatter;

    public DateTimeHourBucketAssigner() {
        this(DEFAULT_DATE_FORMAT_STRING, DEFAULT_HOUR_FORMAT_STRING);
    }

    public DateTimeHourBucketAssigner(String dateFormatString, String hourFormatString) {
        this(dateFormatString, hourFormatString, ZoneId.systemDefault());
    }

    public DateTimeHourBucketAssigner(ZoneId zoneId) {
        this(DEFAULT_DATE_FORMAT_STRING, DEFAULT_HOUR_FORMAT_STRING, zoneId);
    }

    public DateTimeHourBucketAssigner(String dateFormatString, String hourFormatString, ZoneId zoneId) {
        this.dateFormatString = Preconditions.checkNotNull(dateFormatString);
        this.hourFormatString = Preconditions.checkNotNull(hourFormatString);
        this.zoneId = Preconditions.checkNotNull(zoneId);
    }

    @Override
    public String getBucketId(IN element, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern(dateFormatString).withZone(zoneId);
        }
        if (hourFormatter == null) {
            hourFormatter = DateTimeFormatter.ofPattern(hourFormatString).withZone(zoneId);
        }
        Instant time = Instant.ofEpochMilli(context.currentProcessingTime());
        return String.format(
                "dt=%s/hour=%s",
                dateTimeFormatter.format(time),
                hourFormatter.format(time)
        );
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }

    @Override
    public String toString() {
        return "DateTimeHourBucketAssigner{"
                  "dateFormatString='"
                  dateFormatString
                  '\''
                  ", hourFormatString='"
                  hourFormatString
                  '\''
                  ", zoneId="
                  zoneId
                  '}';
    }
}

CodePudding user response:

DateTimeBucketAssigner can already meet your needs, the function parameters need to be adjusted

FileSink<RowData> orcSink = FileSink
            .forBulkFormat(new Path("s3a://mybucket/flink_file_sink_orc_test"), factory)
            .withBucketAssigner(new DateTimeBucketAssigner<>("'dt='yyyyMMdd/'hour='HH", ZoneId.of("Asia/Shanghai")))
            .build();
  • Related