Home > database >  spring Kafka integration test listener not working (KAFKA JsonDeserializer)
spring Kafka integration test listener not working (KAFKA JsonDeserializer)

Time:12-13

KafkaConsumerConfig.java

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);



    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);

    return props;
}

public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
    ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new KafkaErrorHandler());
    factory.setMessageConverter(new StringJsonMessageConverter());
    factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());

    return factory;
}

MetadataFileCustom.Java

@KafkaListener(topics = TOPIC,
        groupId = GROUP,
        containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
    log.info(TOPIC  "===> RECEIVED MESSAGE:"   metadataFileIntegrationDTO);
    metadataFileService.save(metadataFileIntegrationDTO);
}

if I change my consumerFactoryMetadataFileIntegration to

public ConsumerFactory consumerFactoryMetadataFileIntegration() {
    return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
            new StringDeserializer());
}

works, but the sonar complains.. enter image description here

Error: Listener failed; nested exception is java.lang.IllegalStateException: Only String, Bytes, or byte[] supported

CodePudding user response:

The error is saying that since you've used a StringDeserializer class for the value, then your KafkaListener method of that factory needs to be Only String, Bytes, or byte[], and not your DTO object

CodePudding user response:

You can imagine the consumer flow with MessageConverter like:

  1. Native deserializer (StringDeserializer in your case) deserializes byte[] messages to String messages.

  2. Consumer.poll() returns these String messages.

  3. Your MessageConverter (StringJsonMessageConverter) converts these String messages to your type MetadataFileIntegrationDTO (determined by params in @KafkaListener)


So when you defined your native deserializer as JsonDeserializer (corresponding to ConsumerFactory<String, MetadataFileIntegrationDTO>), the consumer.poll() returned MetadataFileIntegrationDTO messages, and that wasn't the type the StringJsonMessageConverter can process (you could see Only String, Bytes, or byte[] supported)

And when you changed JsonDeserializer to StringDeserializer, the corresponding ConsumerFactory was ConsumerFactory<String, String>. This means that when you create a new Consumer from this ConsumerFactory, the consumer.poll() returns String.

  • Related