I am using Debezium on Mysql table to capture changelogs to Kafka with below kafka connect configuration:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "xxxx",
"database.password": "xxxx",
"database.server.id": "42",
"database.server.name": "xxxx",
"table.whitelist": "demo.movies",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
However it is sending all old records from the table to Kafka topic.
Is there any way to read only new changelog data?
CodePudding user response:
The default behavior is to snapshot the table (take all existing data), then read new data.
To only read new data, you need to add "snapshot.mode" : "schema_only"