I Have Simple NestJs Kafka producer and Springboot Kafka Consumer . when i send messge in kafka producer the kafka consumer give the following error.
//
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1809) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) ~[spring-kafka-2.8.8.jar:2.8.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition schdule-topics-0 at offset 1. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.1.jar:na]
at jdk.internal.reflect.GeneratedMethodAccessor50.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.22.jar:5.3.22]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.22.jar:5.3.22]
at jdk.proxy2/jdk.proxy2.$Proxy112.poll(Unknown Source) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1529) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1343) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) ~[spring-kafka-2.8.8.jar:2.8.8]
... 3 common frames omitted
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.22.jar:5.3.22]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1439) ~[kafka-clients-3.1.1.jar:na]
... 21 common frames omitted
// Here is my codes
@Injectable() export class TestResponseService {
constructor(private readonly producerService:ProducerService) {}
//
async scheduleResponse(event:OrderEvent){
console.log("Test response hits");
await this.producerService.produce({
topic:'schdule-topics',
messages: [{
//value: JSON.parse(JSON.stringify(event)),
value: "Hi",
},],
});
console.log("Test response below hits");
return 'Hello World...!';
}
}
// and My Spring boot code here
@Service
public class Consumer{
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics="${spring.kafka.topic.name}",groupId="${spring.kafka.consumer.group-id}")
public void consume(){
//LOGGER.info(String.format("Order event received in Test => %s", event.toString()));
System.out.println("order ID is printing in test: ");
}
}
CodePudding user response:
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
Since it is coming from a non-Spring application, you need to tell the deserializer what type you want it to create from the JSON.
See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-serde
If you are configuring the deserializer via consumer properties, set
spring.kafka.consumer.properties.spring.json.default.value.type=com.example.MyClass
CodePudding user response:
Your NestJS event payload is just a String Hi
, and not JSON. Therefore, use StringDeserializer
rather than JsonDeserializer
in the Spring consumer config.