Home > Software design >  Apache Flink - writing stream to S3 error - null uri host
Apache Flink - writing stream to S3 error - null uri host

Time:05-16

I have a Flink data pipeline that transforms the log file downloaded from S3 and write back in parquet file format to another S3 bucket. I have configured the S3 key & secret in flink-conf.yaml with the

s3.access-key: "key"
s3.secret-key: "secret"

Additionally copied the flink-s3-fs-hadoop-1.15.0.jar & aws-java-sdk-1.12.217.jar to FLINK_HOME/plugins/s3-fs-presto directory.

When the job is submitted to the cluster using flink run command, I get the below exception

Caused by: java.io.IOException: null uri host.
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
    at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    ... 35 more
Caused by: java.lang.NullPointerException: null uri host.
    at java.base/java.util.Objects.requireNonNull(Objects.java:246)
    at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
    ... 41 more

The code to write the data to s3 is below

public static FileSink<Recc> getFileSink() {
        Item<Recc> item = new Item<>(Recc.class);
        ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
        Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
        return FileSink.forBulkFormat(
                path,
                factory)
                .build();
    }

I even tried the minio but still I get the same error. How do resolve this? What more has to be configured?

CodePudding user response:

The problem was : (colon). It was not clearly documented

CodePudding user response:

You just need to remove the colon ":" in the scheme name. As the JavaDocs of java.net.URI state:

... If a scheme is given then it is appended to the result, followed by a colon character (':'). ...

org.apache.flink.core.fs.Path relies on java.net.URI under the hood.

  • Related