Home > Software design >  Kafka serialize a java object with a list of object/LocalDateTime into byte[]
Kafka serialize a java object with a list of object/LocalDateTime into byte[]


I've just started learning Kafka and have ran into some troubles when trying to send an object using kafka template. The custom serializers online all seems to use objectMapper.writeValueAsBytes(); to serialize a java object into byte[].

The classes look something like this

Class ADTO{
/*some attributes*/
List[] BDTO bDTOs;
LocalDateTime localDate;

Class BDTO{
/*some attributes*/

and the custom serializer look something like this

public class ADTOSerializer implements Serializer<ADTO> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void configure(Map<String, ?> configs, boolean isKey) {

    public byte[] serialize(String s, ADTO aDTO) {
        try {
            if (aDTO == null){
                return null;
            return objectMapper.writeValueAsBytes(aDTO);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing RecipeDTO to byte[]");

    public void close() {

The object is sent through the commandLineRunner below

    CommandLineRunner commandLineRunner(KafkaTemplate<String, ADTO> kefkaTemplate){
        return args -> {
            ADTO aDTO = new ADTO(
            kefkaTemplate.send("topic", aDTO);

However, this always threw an exception ("Failed to execute CommandLineRunner... Caused by SerializationException: Error when serializing RecipeDTO to byte[]").

I'm assuming that's caused by either the list of BDTO or the LocalDateTime attribute that exits in ADTO. Possibly both. Do I need to write a serializer for both of them in order for this to work? some pseudo code would be much appreciated.

Some more contexts: this is the producer config

    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
        return props;


public class KafkaTopicConfiguration {

    public NewTopic customTopic(){
        return TopicBuilder.name("topic").build();

and this is the Kafka listener

public class KafkaListeners {
        topics = "topic",
        groupId = "groupID"
    void listener(ADTO aDTO){

CodePudding user response:

I believe, you have to configure your producer configuration some thing like as following, because ADTOSerializer is the one implementing the Serializer

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);

CodePudding user response:

I figured it out. I needed to annotate the LocalDateTimeField with

    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
  • Related