Home > Back-end >  Kafka: Different Deserializers For Different Topics
Kafka: Different Deserializers For Different Topics

Time:10-19

I am currently consuming massages from different Kafka topics using Spring Boot and @KafkaListener annotation.

All of the topics currently contain messages in Avro format (I'm using KafkaAvroDeserializer).

But now, I have to read a new topic, which contains messages in JSON format.

Is it possible to use JsonDeserializer with this new topic and KafkaAvroDeserializer with the legacy topics?

CodePudding user response:

I would suggest using the following configuration for declaring Kafka listener container factories:

@Configuration
public class KafkaConsumersConfig {
  @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
  private String bootstrapServers;

  private <K, V> ConsumerFactory<K, V> generateFactory(
      Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(props, keyDeserializer, valueDeserializer);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, YourClass>
      kafkaListenerContainerFactoryJson() {
    ConcurrentKafkaListenerContainerFactory<String, YourClass> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    var consumerFactory = generateFactory(new StringDeserializer(), new JsonDeserializer<>(YourClass.class, false));
    factory.setConsumerFactory(consumerFactory);
    return factory;
  }

  //TODO: Declare bean for Avro deserializer
}

Where generateFactory method is used to generate a factory based on given key and value deserializers. For different pairs of key and value deserializers, a new ConcurrentKafkaListenerContainerFactory bean should be declared.

Accordingly, KafkaListener should be declared as shown below.

@KafkaListener(
      topics = "${kafka.topic.jsonTopic}",
      containerFactory = "kafkaListenerContainerFactoryJson")
  public void consume(ConsumerRecord<String, YourClass> record) {
    // ...
  }

Where containerFactory corresponds to the bean name from the configuration class.

  • Related