Hi I am using Apache Kafka for consuming messages from another application. I wanted to handle error scenarios when there is an issue in message deserialization or conversion. I am using Avro schema for receiving the object.
I implemented the below
@Configuration
@Slf4j
public class ConsumerConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(((exception, data) -> {
log.error("Error in process with Exception {} and the record is {}", exception, data);
}));
return factory;
}
}
But if I pass message of different object type, above code is not handling it. I tried to pass a string and it is throwing below error but not going inside the Error Hdnaler.
org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "taas.cacib.lscsad-dev.queue.wwfdbtemp.Avros-value" io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject "taas.cacib.lscsad-dev.queue.wwfdbtemp.Avros-value"
CodePudding user response:
Did you change the scheme at any point after creating the kafka topic?
If that is the case you would have to reset the Kafka topic to get the new scheme, if you want to change the scheme of a topic you need to create a new topic.
CodePudding user response:
Use an ErrorHandlingDeserializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
Then, serialization errors will be sent directly to the error handler, which treats such errors as fatal, by default, and will not retry them but send them straight to the recoverer (log by default).