Home > Software engineering >  Flink using multiple structures of data in java
Flink using multiple structures of data in java

Time:02-03

I am reading data from kafka in Java to perform some processing in Apache Flink and sink the results.

I have the kafka topic topic_a which has some data like {name: "abc", age: 20} and some data like {pin: 111, number: 999999, address: "some place"}

When I read the data from kafka using KafkaSource, I deserialize the records into a POJO which has the fields String name, int age with their respective getter and setter functions and constructor.

When I run the flink code, the deserliazer works fine for {name: "abc", age: 20}

KafkaSource<AllDataPOJO> kafkaAllAlertsSource = KafkaSource.<AllAlertsPOJO>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(Arrays.asList("topic_a"))
                .setProperties(properties)
                .setGroupId(allEventsGroupID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new AllDataDeserializationSchema())
                .build();
AllDataPOJO
private String name;
private int age;

The code runs fine for {name: "abc", age: 20}, but as soon as {pin: 111, number: 999999, address: "some place"}, it starts failing.

2 questions:

  1. Is there any way that I can read such varying formats of messages and perform the flink operations. Depending on what kind of message comes, I wish to route it to a different kafka topic.?
  2. When I get {name: "abc", age: 20}, it should go to topic user_basic and {pin: 111, number: 999999, address: "some place"} should go to topic ** user_details**

How can I achieve the above with just 1 flink java code?

CodePudding user response:

You might be interested in specifying your Deserialization Schema as:

.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false)))
        

Then you would then map and filter that source with, validating which fields are present:

Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)

Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)

Or cast the objects to existing POJOs inside your map.

CodePudding user response:

You cannot use <AllDataPOJO> if you have other POJO classes with other fields.

Or, you need to add all fields from all POJO types, and make them nullable when they don't exist in your data. But that may be error prone as name and pin could potentially exist in the same record, for example, but shouldn't.

Otherwise, as the other answer says, use a more generic String/JSON deserializer, and then you can use filter/map operations to cast your data into more concrete types, depending on the fields that are available

  • Related