In my main application.properties
, I have:
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
This is all working fine, and the values are deserialized to a String
as expected.
Now, in my integration tests, I want to introduce another KafkaListener
(that is, a second listener, I do not want to override the behavior of the one in my main application!), but this time with another value deserializer (to a byte array). Is this possible, without having to introduce a custom ListenerContainerFactory
for this listener?
I tried the following, but without success:
@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
...
}
I am getting:
java.lang.ClassCastException: java.lang.String cannot be cast to [B
This means that the ad-hoc value deserializer as defined on the KafkaListener
in the integration test is not used.
I am currently using spring-kafka 2.5.8.
CodePudding user response:
properties
needs to be a list of strings, and can reference constants
@KafkaListener(topics = ..., properties = {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG "=org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
...
}
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties
This will correctly use value.deserializer
as the key (dot, not hyphen)
Or in your integration test, you can load a different property file.