Home > other >  S3 source connector for Kafka Connect unable to read bucket content
S3 source connector for Kafka Connect unable to read bucket content

Time:09-05

I have a bucket on AWS S3 and I try to publish the content of some files in it to a local Kafka cluster running in Docker.

I go to control center and create a new topic (mysql-employees):

partitions: 6
min.insync.replicas: 1
cleanup.policy: delete
retention.ms: 604800000
max.message.bytes: 1048588
retention.bytes: -1

Next thing is to create the actual connector by providing it the following configuration:

{
  "name": "S3SourceConnectorConnectorLatest_0",
  "config": {
    "name": "S3SourceConnectorConnectorSayin_0",
    "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
    "topic.regex.list": "mysql-employees:.\\.json",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "s3.bucket.name": "analytics-s3-kafka-test",
    "s3.region": "eu-west-1",
    "aws.access.key.id": "**********************",
    "aws.secret.access.key": "****************************************"
  }
}

And just right after launch I get the following error from the connect container logs:

[2022-09-02 12:29:24,365] ERROR WorkerSourceTask{id=S3SourceConnectorConnectorLatest_1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Error while executing read with record co-ordinates : RecordCoordinates [storagePartition=topics/mysql-employees/partition=0/, startOffset=0, endOffset=1221]
        at io.confluent.connect.cloud.storage.errorhandler.handlers.ReThrowErrorHandler.handle(ReThrowErrorHandler.java:21)
        at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:69)
        at io.confluent.connect.cloud.storage.source.StorageSourceTask.poll(StorageSourceTask.java:161)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:307)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
        at io.confluent.connect.cloud.storage.source.format.CloudStorageJsonFormat.extractWithRestore(CloudStorageJsonFormat.java:73)
        at io.confluent.connect.cloud.storage.source.format.CloudStorageJsonFormat.extractRecord(CloudStorageJsonFormat.java:66)
        at io.confluent.connect.cloud.storage.source.StorageObjectFormat.nextRecord(StorageObjectFormat.java:72)
        at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:65)
        ... 10 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message to JSON in topic 
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
        ... 14 more

I really have no idea what would be the reason. The bucket content is as follows (as you can see, I am kind of guessing what is the expected folder hierarchy):

enter image description here

and this is how my s3 JSON file looks like:

[
   {
      "isActive":1,
      "address":"8685 Hand Vista, Suite 627, 22274, Jacobibury, Massachusetts, United States",
      "totalNumberOfItemsInside":7,
      "lastStatusSync":"2021-06-16T07:38:34Z",
      "lat":42.409804,
      "lon":-71.378739
   },
   {
      "isActive":1,
      "address":"409 Charles Bancroft Hwy, Litchfield, NH 03052, United States",
      "totalNumberOfItemsInside":5,
      "lastStatusSync":"2021-06-16T07:38:34Z",
      "lat":42.871732,
      "lon":-71.473854
   },
   {
      "isActive":0,
      "address":"41 Marion St, Brookline, MA 02446, United States",
      "totalNumberOfItemsInside":4,
      "lastStatusSync":"2021-06-16T07:38:34Z",
      "lat":42.340292,
      "lon":-71.122403
   },
   {
      "isActive":1,
      "address":"86 S Swan St, Albany, NY 12210, United States",
      "totalNumberOfItemsInside":0,
      "lastStatusSync":"2021-06-16T07:38:34Z",
      "lat":42.653673,
      "lon":-73.759808
   },
   {
      "isActive":0,
      "address":"140 Fair St, Northampton, MA 01060, United States",
      "totalNumberOfItemsInside":2,
      "lastStatusSync":"2021-06-16T07:38:34Z",
      "lat":42.325064,
      "lon":-72.613797
   }
]

CodePudding user response:

To read any JSON file from S3, it should be line delimited JSON objects, each on their own line, not expanded/indented.

Documentation explaining the formats is here - https://docs.confluent.io/kafka-connectors/s3-source/current/generalized/overview.html#s3-object-formats

  • Related