Home > database >  Define custom value deserializer on KafkaListener
Define custom value deserializer on KafkaListener

Time:07-29

In my main application.properties, I have:

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

This is all working fine, and the values are deserialized to a String as expected.

Now, in my integration tests, I want to introduce another KafkaListener (that is, a second listener, I do not want to override the behavior of the one in my main application!), but this time with another value deserializer (to a byte array). Is this possible, without having to introduce a custom ListenerContainerFactory for this listener?

I tried the following, but without success:

@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}

I am getting:

java.lang.ClassCastException: java.lang.String cannot be cast to [B

This means that the ad-hoc value deserializer as defined on the KafkaListener in the integration test is not used.

I am currently using spring-kafka 2.5.8.

CodePudding user response:

properties needs to be a list of strings, and can reference constants

@KafkaListener(topics = ..., properties = {
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG   "=org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}

https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties

This will correctly use value.deserializer as the key (dot, not hyphen)

Or in your integration test, you can load a different property file.

  • Related