I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?
NOTE: I'm using flink version 14.3
DataStream<String> data = .....
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(parameter.get("bootstrap.servers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(parameter.get("kafka.output.topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
data.sinkTo(sink);
CodePudding user response:
I haven't tried this, but I believe that rather than using setTopic
to hardwire each sink instance to a specific topic, you can instead implement the serialize
method on your KafkaRecordSerializationSchema
so that each ProducerRecord
it returns specifies the topic it should be written to.
Another option would be to create a separate sink object for every topic, and then use a ProcessFunction
that fans out to set of side outputs, each connected to the appropriate sink.