Home > Software engineering >  How to select kafka topic dynamically in apache flink kafka sink?
How to select kafka topic dynamically in apache flink kafka sink?

Time:06-30

I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?

NOTE: I'm using flink version 14.3

    DataStream<String> data = .....
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(parameter.get("bootstrap.servers"))
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(parameter.get("kafka.output.topic"))
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    data.sinkTo(sink);

CodePudding user response:

I haven't tried this, but I believe that rather than using setTopic to hardwire each sink instance to a specific topic, you can instead implement the serialize method on your KafkaRecordSerializationSchema so that each ProducerRecord it returns specifies the topic it should be written to.

Another option would be to create a separate sink object for every topic, and then use a ProcessFunction that fans out to set of side outputs, each connected to the appropriate sink.

  • Related