I am using Flink FileSink to sink my data to s3 file storage.
I need my data structured into subdirectories like "s3://mybucket/dt=20210926/hour=13/".
Now I managed to write my data into customized buckets using DateTimeBucketAssigner as following:
FileSink<RowData> orcSink = FileSink
.forBulkFormat(new Path("s3a://mybucket/flink_file_sink_orc_test"), factory)
.withBucketAssigner(new DateTimeBucketAssigner<>("'dt='yyyyMMdd", ZoneId.of("Asia/Shanghai")))
.build();
By doing this I was able to create buckets like "s3://mybucket/dt=20210926".
I am struggling to start buckets like "s3://mybucket/dt=yyyyMMdd/hour=HH". Need help! Thanks!
CodePudding user response:
If you need custom data partitioning, I think you should implement custom BucketAssigner
similar to a built-in DateTimeBucketAssigner
, which will partition based on your needs.
Here is the example of the bucket assigner for your case:
public class DateTimeHourBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_DATE_FORMAT_STRING = "yyyyMMdd";
private static final String DEFAULT_HOUR_FORMAT_STRING = "HH";
private final String dateFormatString;
private final String hourFormatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
private transient DateTimeFormatter hourFormatter;
public DateTimeHourBucketAssigner() {
this(DEFAULT_DATE_FORMAT_STRING, DEFAULT_HOUR_FORMAT_STRING);
}
public DateTimeHourBucketAssigner(String dateFormatString, String hourFormatString) {
this(dateFormatString, hourFormatString, ZoneId.systemDefault());
}
public DateTimeHourBucketAssigner(ZoneId zoneId) {
this(DEFAULT_DATE_FORMAT_STRING, DEFAULT_HOUR_FORMAT_STRING, zoneId);
}
public DateTimeHourBucketAssigner(String dateFormatString, String hourFormatString, ZoneId zoneId) {
this.dateFormatString = Preconditions.checkNotNull(dateFormatString);
this.hourFormatString = Preconditions.checkNotNull(hourFormatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
}
@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(dateFormatString).withZone(zoneId);
}
if (hourFormatter == null) {
hourFormatter = DateTimeFormatter.ofPattern(hourFormatString).withZone(zoneId);
}
Instant time = Instant.ofEpochMilli(context.currentProcessingTime());
return String.format(
"dt=%s/hour=%s",
dateTimeFormatter.format(time),
hourFormatter.format(time)
);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String toString() {
return "DateTimeHourBucketAssigner{"
"dateFormatString='"
dateFormatString
'\''
", hourFormatString='"
hourFormatString
'\''
", zoneId="
zoneId
'}';
}
}
CodePudding user response:
DateTimeBucketAssigner can already meet your needs, the function parameters need to be adjusted
FileSink<RowData> orcSink = FileSink
.forBulkFormat(new Path("s3a://mybucket/flink_file_sink_orc_test"), factory)
.withBucketAssigner(new DateTimeBucketAssigner<>("'dt='yyyyMMdd/'hour='HH", ZoneId.of("Asia/Shanghai")))
.build();