Home > Enterprise >  Apache Flink sink doesn't get invoked when using ProcessFunction or RichFlatMapFunction
Apache Flink sink doesn't get invoked when using ProcessFunction or RichFlatMapFunction

Time:11-27

My Flink application reads data from one kafka source, maps it into an object and writes to another kafka topic. All works fine if I use MapFunction to transform but as soon as I use an implementation that extends ProcessFunction or RichFlatMapFunction classes, sink doesn't get called (write to kafka topic code doesn't get executed) at all. The reason I'm using ProcessFunction or RichFlatMapFunction is because I need RuntimeConext() to read and write ValueState. How should I achieve calling of sink in this case?

env.addSource(new FlinkKafkaConsumer<>("READ_FROM_TOPIC", new Deserializer(), pros)).keyBy(Order::getId) .process(new StatefulOrderMapper()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new Serializer(), props)); // StatefulOrderMapper extends ProcessFunction

env.addSource(new FlinkKafkaConsumer<>("READ_FROM_TOPIC", new Deserializer(), pros)).keyBy(Order::getId) .map(new DoSomeMapping()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new OrderSerializer(), props)); //DoSomeMapping extends MapFunction

CodePudding user response:

One big difference between map and flatmap or process is that a map emits the return value of the MapFunction, whereas a FlatMap or a ProcessFunction uses a Collector to emit events.

For a working example you can use as a starting point, I suggest taking a look at https://docs.immerok.cloud/docs/how-to-guides/development/batch-and-streaming-with-the-apache-flink-table-and-datastream-apis/#the-datastream-workflow. You'll find the code in https://github.com/immerok/recipes/tree/main/latest-transaction.

Note: I work for Immerok.

  • Related