I've just started learning Kafka and have ran into some troubles when trying to send an object using kafka template. The custom serializers online all seems to use objectMapper.writeValueAsBytes();
to serialize a java object into byte[].
The classes look something like this
Class ADTO{
/*some attributes*/
List[] BDTO bDTOs;
LocalDateTime localDate;
}
Class BDTO{
/*some attributes*/
}
and the custom serializer look something like this
public class ADTOSerializer implements Serializer<ADTO> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String s, ADTO aDTO) {
try {
if (aDTO == null){
return null;
}
return objectMapper.writeValueAsBytes(aDTO);
} catch (Exception e) {
throw new SerializationException("Error when serializing RecipeDTO to byte[]");
}
}
@Override
public void close() {
}
}
The object is sent through the commandLineRunner below
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, ADTO> kefkaTemplate){
return args -> {
ADTO aDTO = new ADTO(
/*attributes*/
);
kefkaTemplate.send("topic", aDTO);
};
}
However, this always threw an exception ("Failed to execute CommandLineRunner... Caused by SerializationException: Error when serializing RecipeDTO to byte[]").
I'm assuming that's caused by either the list of BDTO or the LocalDateTime attribute that exits in ADTO. Possibly both. Do I need to write a serializer for both of them in order for this to work? some pseudo code would be much appreciated.
Some more contexts: this is the producer config
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
return props;
}
topic
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic customTopic(){
return TopicBuilder.name("topic").build();
}
}
and this is the Kafka listener
@Component
public class KafkaListeners {
@KafkaListener(
topics = "topic",
groupId = "groupID"
)
void listener(ADTO aDTO){
}
}
CodePudding user response:
I believe, you have to configure your producer configuration some thing like as following, because ADTOSerializer is the one implementing the Serializer
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ADTOSerializer.class);
CodePudding user response:
I figured it out. I needed to annotate the LocalDateTimeField with
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)