Home > Net >  value schema must be a struct
value schema must be a struct

Time:01-06

I am sending a nested json data to Kafka consumer for PostgreSQL sink.I am building sink connector and unfortunately I cant change data at source. I want to send the data as it is without any conversions using kafka.

kafka connect is showing this error:

[2023-01-04 22:58:15,227] ERROR WorkerSinkTask{id=Kafkapgsink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value schema must be of type Struct (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

My kafka connector properties are =

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

sink properties are name=Kafkapgsink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector task.max=100 connection.url=jdbc:postgresql://localhost:5432/fileintegrity connection.user=postgres connection.password=Mysore@123 insert.mode=insert auto.create=true auto.evolve=true table.name.format=fileeventslog pk.mode=record_key delete.enabled=true

CodePudding user response:

Your problem is here

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Strings do not have typed key, value pairs (i.e. structure)

More details here if you want to use JSON - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

As you see there, schemas.enable is only a property of JsonConverter

  • Related