My Flink application reads data from one kafka source, maps it into an object and writes to another kafka topic. All works fine if I use MapFunction
to transform but as soon as I use an implementation that extends ProcessFunction or RichFlatMapFunction
classes, sink doesn't get called (write to kafka topic code doesn't get executed) at all. The reason I'm using ProcessFunction
or RichFlatMapFunction
is because I need RuntimeConext() to read and write ValueState. How should I achieve calling of sink in this case?
env.addSource(new FlinkKafkaConsumer<>("READ_FROM_TOPIC", new Deserializer(), pros)).keyBy(Order::getId) .process(new StatefulOrderMapper()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new Serializer(), props)); // StatefulOrderMapper extends ProcessFunction
env.addSource(new FlinkKafkaConsumer<>("READ_FROM_TOPIC", new Deserializer(), pros)).keyBy(Order::getId) .map(new DoSomeMapping()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new OrderSerializer(), props)); //DoSomeMapping extends MapFunction
CodePudding user response:
One big difference between map and flatmap or process is that a map emits the return value of the MapFunction, whereas a FlatMap or a ProcessFunction uses a Collector to emit events.
For a working example you can use as a starting point, I suggest taking a look at https://docs.immerok.cloud/docs/how-to-guides/development/batch-and-streaming-with-the-apache-flink-table-and-datastream-apis/#the-datastream-workflow. You'll find the code in https://github.com/immerok/recipes/tree/main/latest-transaction.
Note: I work for Immerok.