I have a Flink data pipeline that transforms the log file downloaded from S3 and write back in parquet file format to another S3 bucket. I have configured the S3 key & secret in flink-conf.yaml
with the
s3.access-key: "key"
s3.secret-key: "secret"
Additionally copied the flink-s3-fs-hadoop-1.15.0.jar
& aws-java-sdk-1.12.217.jar
to FLINK_HOME/plugins/s3-fs-presto
directory.
When the job is submitted to the cluster using flink run
command, I get the below exception
Caused by: java.io.IOException: null uri host.
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
... 35 more
Caused by: java.lang.NullPointerException: null uri host.
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
... 41 more
The code to write the data to s3 is below
public static FileSink<Recc> getFileSink() {
Item<Recc> item = new Item<>(Recc.class);
ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
return FileSink.forBulkFormat(
path,
factory)
.build();
}
I even tried the minio but still I get the same error. How do resolve this? What more has to be configured?
CodePudding user response:
The problem was :
(colon). It was not clearly documented
CodePudding user response:
You just need to remove the colon ":" in the scheme name. As the JavaDocs of java.net.URI
state:
... If a scheme is given then it is appended to the result, followed by a colon character (':'). ...
org.apache.flink.core.fs.Path
relies on java.net.URI
under the hood.