Home > Software design >  Kafka serialize a java object with a list of object/LocalDateTime into byte[]
Kafka serialize a java object with a list of object/LocalDateTime into byte[]

Time:08-13

I've just started learning Kafka and have ran into some troubles when trying to send an object using kafka template. The custom serializers online all seems to use objectMapper.writeValueAsBytes(); to serialize a java object into byte[].

The classes look something like this

Class ADTO{
/*some attributes*/
List[] BDTO bDTOs;
LocalDateTime localDate;
}

Class BDTO{
/*some attributes*/
}

and the custom serializer look something like this

public class ADTOSerializer implements Serializer<ADTO> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String s, ADTO aDTO) {
        try {
            if (aDTO == null){
                return null;
            }
            return objectMapper.writeValueAsBytes(aDTO);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing RecipeDTO to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

The object is sent through the commandLineRunner below

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, ADTO> kefkaTemplate){
        return args -> {
            ADTO aDTO = new ADTO(
                /*attributes*/
            );
            kefkaTemplate.send("topic", aDTO);
        };
    }

However, this always threw an exception ("Failed to execute CommandLineRunner... Caused by SerializationException: Error when serializing RecipeDTO to byte[]").

I'm assuming that's caused by either the list of BDTO or the LocalDateTime attribute that exits in ADTO. Possibly both. Do I need to write a serializer for both of them in order for this to work? some pseudo code would be much appreciated.


Some more contexts: this is the producer config

    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
        return props;
    }

topic

@Configuration
public class KafkaTopicConfiguration {

    @Bean
    public NewTopic customTopic(){
        return TopicBuilder.name("topic").build();
    }
}

and this is the Kafka listener

@Component
public class KafkaListeners {
    @KafkaListener(
        topics = "topic",
        groupId = "groupID"
    )
    void listener(ADTO aDTO){
    }
}

CodePudding user response:

I believe, you have to configure your producer configuration some thing like as following, because ADTOSerializer is the one implementing the Serializer

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);

CodePudding user response:

I figured it out. I needed to annotate the LocalDateTimeField with

    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
  • Related