Home > Mobile >  How to use sinkTo in Flink to write multi file in AWS S3
How to use sinkTo in Flink to write multi file in AWS S3

Time:10-06

i want to save data of many customers in different files.

Now, data is being stored in DataStream variable. How to use sinkTo to write multi file in AWS S3. Each file contains the data of a customer and the file name is the customer name.

public static void writeMultiFile(DataStream<Tuple5<String, Long, Double, String, String>> data) throws Exception {
    String currentDir = System.getProperty("user.dir");
    Path pathNew = new Path(currentDir   "/output/");

    OutputFileConfig config = OutputFileConfig
            .builder()
            .withPartPrefix("namefile")
            .withPartSuffix(".parquet")
            .build();

    final FileSink<GenericRecord> sink = FileSink
            .forBulkFormat(pathNew, AvroParquetWriters.forGenericRecord(schema))
            .withOutputFileConfig(config)
            .build();

    data.keyBy(value->value.f0).map(new convertGenericRecord()).sinkTo(sink);
}

I need the file names to change by key. The code above can't do that, filename must be predefined, it cannot be changed dynamically by key.

Please help me!

CodePudding user response:

You can do this by implementing a BucketAssigner.

Something along these lines:

public static final class KeyBucketAssigner
        implements BucketAssigner<Event, String> {

    private static final long serialVersionUID = 987325769970523326L;

    @Override
    public String getBucketId(final Event element, final Context context) {
        return String.valueOf(Event.key);
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

documentation
javadocs

  • Related