Home > Enterprise >  This error handler cannot process 'SerializationException's directly; please consider conf
This error handler cannot process 'SerializationException's directly; please consider conf

Time:12-07

Produder properties

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Consumer properties

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

Consumer Service

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: "   user.getAge()   " Fav Genre "   user.getFavGenre());
    }
}

Producer Service

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}

Producer Config for creating topic

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}

Producer works well but Consumer gives error like

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an

'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1760 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1283 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na] Executors.java:539 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] FutureTask.java:264 at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] Thread.java:833 Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition user-topic-0 at offset 1. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429 at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:134 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652 at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:721 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na] Fetcher.java:672 at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1507 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessageListenerContainer.java:1497 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325) ~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage

I will be glad if you help since I am new to kafka and trying to figure out why getting this error

CodePudding user response:

Does the error message not tell you anything?

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

With Boot:

...
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
...
  • Related