Home > Blockchain >  How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benth
How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benth

Time:02-15

I am using Benthos to read AVRO-encoded messages from Kafka which have the kafka_key metadata field set to also contain an AVRO-encoded payload. The schemas of these AVRO-encoded payloads are stored in Schema Registry and Benthos has a schema_registry_decode processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content containing the decoded AVRO message and the other one called metadata containing the various metadata fields collected by Benthos including the decoded kafka_key payload.

CodePudding user response:

It turns out that one can achieve this using a branch processor like so:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
        root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
          root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this

output:
  stdout: {}
  • Related