Home > Enterprise >  Using kafka-json-schema-console-producer to produce message with a key schema and a value schema
Using kafka-json-schema-console-producer to produce message with a key schema and a value schema

Time:10-06

I am trying to use kafka-json-schema-console-producer to publish a message that contains both a key (with schema) and a value (with schema). Unfortunately, I am unable to find an example that does what I want.

I can follow the documentation and send simple messages:

kafka-json-schema-console-producer \
  --broker-list localhost:9092 \
  --topic some-topic \
  --property value.schema='
{
  "definitions" : {
    "record:myrecord" : {
      "type" : "object",
      "required" : [ "name", "calories" ],
      "additionalProperties" : false,
      "properties" : {
        "name" : {"type" : "string"},
        "calories" : {"type" : "number"},
        "colour" : {"type" : "string"}
      }
    }
  },
  "$ref" : "#/definitions/record:myrecord"
}' < snacks.txt

Two questions:

  1. How do I add a key schema? Is it as simple as adding a "key.schema" and using similar syntax to the value.schema?
  2. What does the actual command look like to send JSON message with a key schema and value schema?

CodePudding user response:

  1. Yes, add --property key.schema. There are also options for key.schema.file for a jsonschema file on-disk or key.schema.id for an ID already in the registry.

  2. Refer the source code examples - https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java#L72

Extract (as of v6.2.0)

 * bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
 * --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
 * --property schema.registry.url=http://localhost:8081 \
 * --property parse.key=true \
 * --property key.schema='{"type":"string"}' \
 * --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
 * "type":"string"}]}'

kafka-json-schema-console-producer is shorthand for

kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader

CodePudding user response:

Thanks to help from @OneCricketeer, I was able to get this working. Below is a full example that will send messages with a JSON schema for both key and value. (BTW: I show a solution using Docker but it should be simple to modify this to not use Docker)

Note that there are a few things that tripped me up:

  • you will need the "key.parse=true" property (if it is omitted then the key is not parsed and you will get an error or the key will not be present when consuming the message)
  • the key definition looks identical to the value definition but requires a different definition/record name from the value (for the value: "definition/record:myrecord" and for the key: "definition/record:mykey"
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property key.separator="|" \
--property value.schema='
{
  "definitions" : {
    "record:myrecord" : {
      "type" : "object",
      "required" : [ "name", "calories" ],
      "properties" : {
        "name" : {"type" : "string"},
        "calories" : {"type" : "number"},
        "colour" : {"type" : "string"}
      }
    }
  },
  "$ref" : "#/definitions/record:myrecord"
}' \
--property parse.key=true \
--property key.schema='
{
  "definitions" : {
    "record:mykey" : {
      "type" : "object",
      "required" : [ "id" ],
      "additionalProperties" : false,
      "properties" : {
        "id" : {"type" : "integer"}
      }
    }
  },
  "$ref" : "#/definitions/record:mykey"

And the data:

{"id":1} | {"timestamp":"foo", "data":"bar"}
  • Related