Home > Software engineering >  How to assign json kafka message to Java object?
How to assign json kafka message to Java object?

Time:08-05

I created a kafka consumer service using spring boot. I need to get the message from the kafka and assign it to the java object and do some calculations according to the data in the object and store it in MongoDB database.

Here is my kafka listner.

    @KafkaListener(topics = "neworder", groupId = "group_json",
        containerFactory = "orderKafkaListenerFactory")
public void consumeJson(List<Order> order) {
   System.out.println("Consumed JSON Message: "   order);
}

Here is the received kafka message.

Consumed JSON Message: [{Id=11, FualType=Petrol 92, Capacity=33000}]

Here is my Java class.

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

 @Document("Order")
 public class Order {

    @Id
    private int id;
    private String type;
    private int capacity;

public Order() {
}

public Order(int id, String type, int capacity) {
    this.id = id;
    this.type = type;
    this.capacity = capacity;
}

public int getId() {
    return id;
}

public void setId(int id) {
    this.id = id;
}

public String getType() {
    return type;
}

public void setType(String type) {
    this.type = type;
}

public int getCapacity() {
    return capacity;
}

public void setCapacity(int capacity) {
    this.capacity = capacity;
}

}

I need to assign the received message data from kafka to this java class. Anyone can help me?

CodePudding user response:

I assume you are getting some errors at the moment. It would help if you could add them to your question.

First, since the fields in the json of the Kafka message do not match those of your java class, you have to annotate them:

@Id
@JsonProperty("Id")
private int id;

@JsonProperty("FualType")
private String type;

@JsonProperty("Capacity")
private int capacity;

In terms of configuration of the consumer, normally it is enough to configure the JsonDeserializer for the value of the kafka message and use it in the ConsumerFactory:

@Bean
public ConsumerFactory<String, List<Order>> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

and

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
    ...
    return props;
}
  • Related