I am trying to use kafka-json-schema-console-producer to publish a message that contains both a key (with schema) and a value (with schema). Unfortunately, I am unable to find an example that does what I want.
I can follow the documentation and send simple messages:
kafka-json-schema-console-producer \
--broker-list localhost:9092 \
--topic some-topic \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"additionalProperties" : false,
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' < snacks.txt
Two questions:
- How do I add a key schema? Is it as simple as adding a "key.schema" and using similar syntax to the value.schema?
- What does the actual command look like to send JSON message with a key schema and value schema?
CodePudding user response:
Yes, add
--property key.schema
. There are also options forkey.schema.file
for a jsonschema file on-disk orkey.schema.id
for an ID already in the registry.Refer the source code examples - https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java#L72
Extract (as of v6.2.0)
* bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property parse.key=true \
* --property key.schema='{"type":"string"}' \
* --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
* "type":"string"}]}'
kafka-json-schema-console-producer
is shorthand for
kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader
CodePudding user response:
Thanks to help from @OneCricketeer, I was able to get this working. Below is a full example that will send messages with a JSON schema for both key and value. (BTW: I show a solution using Docker but it should be simple to modify this to not use Docker)
Note that there are a few things that tripped me up:
- you will need the "key.parse=true" property (if it is omitted then the key is not parsed and you will get an error or the key will not be present when consuming the message)
- the key definition looks identical to the value definition but requires a different definition/record name from the value (for the value: "definition/record:myrecord" and for the key: "definition/record:mykey"
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property key.separator="|" \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' \
--property parse.key=true \
--property key.schema='
{
"definitions" : {
"record:mykey" : {
"type" : "object",
"required" : [ "id" ],
"additionalProperties" : false,
"properties" : {
"id" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/record:mykey"
And the data:
{"id":1} | {"timestamp":"foo", "data":"bar"}