KafkaConsumerConfig.java
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return props;
}
public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new KafkaErrorHandler());
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());
return factory;
}
MetadataFileCustom.Java
@KafkaListener(topics = TOPIC,
groupId = GROUP,
containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
log.info(TOPIC "===> RECEIVED MESSAGE:" metadataFileIntegrationDTO);
metadataFileService.save(metadataFileIntegrationDTO);
}
if I change my consumerFactoryMetadataFileIntegration to
public ConsumerFactory consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
works, but the sonar complains..
Error: Listener failed; nested exception is java.lang.IllegalStateException: Only String, Bytes, or byte[] supported
CodePudding user response:
The error is saying that since you've used a StringDeserializer class for the value, then your KafkaListener method of that factory needs to be Only String, Bytes, or byte[]
, and not your DTO object
CodePudding user response:
You can imagine the consumer flow with MessageConverter like:
Native deserializer (
StringDeserializer
in your case) deserializesbyte[]
messages toString
messages.Consumer.poll() returns these
String
messages.Your MessageConverter (
StringJsonMessageConverter
) converts theseString
messages to your typeMetadataFileIntegrationDTO
(determined by params in @KafkaListener)
So when you defined your native deserializer as JsonDeserializer
(corresponding to ConsumerFactory<String, MetadataFileIntegrationDTO>
), the consumer.poll() returned MetadataFileIntegrationDTO
messages, and that wasn't the type the StringJsonMessageConverter
can process (you could see Only String, Bytes, or byte[] supported)
And when you changed JsonDeserializer
to StringDeserializer
, the corresponding ConsumerFactory
was ConsumerFactory<String, String>
. This means that when you create a new Consumer
from this ConsumerFactory
, the consumer.poll() returns String
.