I have a requirement where I will get JSON messages (with different schemas) to same Kafka topic. However, when syncing to S3, I have to write to different folders to S3 based on schema.
Could you please let me know how to do this in Kafka Connect? I'm looking at Single message transaction but not able to figure it out.
Thanks
CodePudding user response:
Do you use Avro Serialiser?
As far as you have deserialised object, you can decide what is the schema is.
if( typeof(yourMessage) == YourModelType1 ) {
}
else if( typeof(yourMessage) == YourModelType2 ) {
}
CodePudding user response:
Buckets cannot be changed. Prefixes within a bucket (what you refer to as "folders") can, but only if you have field information in the value of the record.
e.g. some example data
{"type": "data1", "payload": {...}}
{"type": "data2", "payload": {...}}
With these configs
partitioner.class=FieldPartitioner
partition.field.name=type
Should write to S3 as
s3://bucket/type=data1/topic-partition-offset.json
s3://bucket/type=data2/topic-partition-offset.json
If the type
field isn't available, then you will need to write a custom SMT that can inspect each of your records to determine what type it will be.