I am sending a nested json data to Kafka consumer for PostgreSQL sink.I am building sink connector and unfortunately I cant change data at source. I want to send the data as it is without any conversions using kafka.
kafka connect is showing this error:
[2023-01-04 22:58:15,227] ERROR WorkerSinkTask{id=Kafkapgsink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value schema must be of type Struct (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
My kafka connector properties are =
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
sink properties are
name=Kafkapgsink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector task.max=100 connection.url=jdbc:postgresql://localhost:5432/fileintegrity connection.user=postgres connection.password=Mysore@123 insert.mode=insert auto.create=true auto.evolve=true table.name.format=fileeventslog pk.mode=record_key delete.enabled=true
CodePudding user response:
Your problem is here
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Strings do not have typed key, value pairs (i.e. structure)
More details here if you want to use JSON - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
As you see there, schemas.enable
is only a property of JsonConverter