I am using Benthos to read AVRO-encoded messages from Kafka which have the kafka_key
metadata field set to also contain an AVRO-encoded payload. The schemas of these AVRO-encoded payloads are stored in Schema Registry and Benthos has a schema_registry_decode
processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content
containing the decoded AVRO message and the other one called metadata
containing the various metadata fields collected by Benthos including the decoded kafka_key
payload.
CodePudding user response:
It turns out that one can achieve this using a branch
processor like so:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}