I have a bucket on AWS S3 and I try to publish the content of some files in it to a local Kafka cluster running in Docker.
I go to control center and create a new topic (mysql-employees):
partitions: 6
min.insync.replicas: 1
cleanup.policy: delete
retention.ms: 604800000
max.message.bytes: 1048588
retention.bytes: -1
Next thing is to create the actual connector by providing it the following configuration:
{
"name": "S3SourceConnectorConnectorLatest_0",
"config": {
"name": "S3SourceConnectorConnectorSayin_0",
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"topic.regex.list": "mysql-employees:.\\.json",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"s3.bucket.name": "analytics-s3-kafka-test",
"s3.region": "eu-west-1",
"aws.access.key.id": "**********************",
"aws.secret.access.key": "****************************************"
}
}
And just right after launch I get the following error from the connect container logs:
[2022-09-02 12:29:24,365] ERROR WorkerSourceTask{id=S3SourceConnectorConnectorLatest_1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Error while executing read with record co-ordinates : RecordCoordinates [storagePartition=topics/mysql-employees/partition=0/, startOffset=0, endOffset=1221]
at io.confluent.connect.cloud.storage.errorhandler.handlers.ReThrowErrorHandler.handle(ReThrowErrorHandler.java:21)
at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:69)
at io.confluent.connect.cloud.storage.source.StorageSourceTask.poll(StorageSourceTask.java:161)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:307)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
at io.confluent.connect.cloud.storage.source.format.CloudStorageJsonFormat.extractWithRestore(CloudStorageJsonFormat.java:73)
at io.confluent.connect.cloud.storage.source.format.CloudStorageJsonFormat.extractRecord(CloudStorageJsonFormat.java:66)
at io.confluent.connect.cloud.storage.source.StorageObjectFormat.nextRecord(StorageObjectFormat.java:72)
at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:65)
... 10 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
... 14 more
I really have no idea what would be the reason. The bucket content is as follows (as you can see, I am kind of guessing what is the expected folder hierarchy):
and this is how my s3 JSON file looks like:
[
{
"isActive":1,
"address":"8685 Hand Vista, Suite 627, 22274, Jacobibury, Massachusetts, United States",
"totalNumberOfItemsInside":7,
"lastStatusSync":"2021-06-16T07:38:34Z",
"lat":42.409804,
"lon":-71.378739
},
{
"isActive":1,
"address":"409 Charles Bancroft Hwy, Litchfield, NH 03052, United States",
"totalNumberOfItemsInside":5,
"lastStatusSync":"2021-06-16T07:38:34Z",
"lat":42.871732,
"lon":-71.473854
},
{
"isActive":0,
"address":"41 Marion St, Brookline, MA 02446, United States",
"totalNumberOfItemsInside":4,
"lastStatusSync":"2021-06-16T07:38:34Z",
"lat":42.340292,
"lon":-71.122403
},
{
"isActive":1,
"address":"86 S Swan St, Albany, NY 12210, United States",
"totalNumberOfItemsInside":0,
"lastStatusSync":"2021-06-16T07:38:34Z",
"lat":42.653673,
"lon":-73.759808
},
{
"isActive":0,
"address":"140 Fair St, Northampton, MA 01060, United States",
"totalNumberOfItemsInside":2,
"lastStatusSync":"2021-06-16T07:38:34Z",
"lat":42.325064,
"lon":-72.613797
}
]
CodePudding user response:
To read any JSON file from S3, it should be line delimited JSON objects, each on their own line, not expanded/indented.
Documentation explaining the formats is here - https://docs.confluent.io/kafka-connectors/s3-source/current/generalized/overview.html#s3-object-formats