I am currently consuming massages from different Kafka topics using Spring Boot and @KafkaListener
annotation.
All of the topics currently contain messages in Avro format (I'm using KafkaAvroDeserializer
).
But now, I have to read a new topic, which contains messages in JSON format.
Is it possible to use JsonDeserializer
with this new topic and KafkaAvroDeserializer
with the legacy topics?
CodePudding user response:
I would suggest using the following configuration for declaring Kafka listener container factories:
@Configuration
public class KafkaConsumersConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
private <K, V> ConsumerFactory<K, V> generateFactory(
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props, keyDeserializer, valueDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, YourClass>
kafkaListenerContainerFactoryJson() {
ConcurrentKafkaListenerContainerFactory<String, YourClass> factory =
new ConcurrentKafkaListenerContainerFactory<>();
var consumerFactory = generateFactory(new StringDeserializer(), new JsonDeserializer<>(YourClass.class, false));
factory.setConsumerFactory(consumerFactory);
return factory;
}
//TODO: Declare bean for Avro deserializer
}
Where generateFactory
method is used to generate a factory based on given key and value deserializers.
For different pairs of key and value deserializers, a new ConcurrentKafkaListenerContainerFactory
bean should be declared.
Accordingly, KafkaListener
should be declared as shown below.
@KafkaListener(
topics = "${kafka.topic.jsonTopic}",
containerFactory = "kafkaListenerContainerFactoryJson")
public void consume(ConsumerRecord<String, YourClass> record) {
// ...
}
Where containerFactory
corresponds to the bean name from the configuration class.