I want to implement manual commit with ack mode MANUAL_IMMDEDIATE. I use AcknowledgingMessageListener class to make consumer. Its fine while application run but after consumer receive data then application got error but application still running. Error happen only after data received. Any suggest will be appreciate , Thanks
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
public class MyCustomMessageListener extends CustomMessageListener {
@Override
@SneakyThrows
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
createDefaultMethodKafkaListenerEndpoint(name, topic);
kafkaListenerEndpoint.setBean(new MyMessageListener());
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class));
return kafkaListenerEndpoint;
}
@Slf4j
private static class MyMessageListener implements AcknowledgingMessageListener<String, String> {
/**
* Invoked with data from kafka.
* @param acknowledgment ack
* @param data the data to be processed.
*/
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
log.info("My message listener got a new record: " data);
acknowledgment.acknowledge();
log.info("My message listener done processing record: " data);
}
}
}
APPLICATION LOG
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:28.908 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:28.909 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:29.450 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:29.452 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:30.039 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:30.041 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:30.710 ERROR 32664 --- [antopicPE-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for consumerpe-0@61
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
CodePudding user response:
as per documentation says, dont use onMessage with ConsumerRecord, because this message listener is acknowledging, it need the acknowledgement to acknowledge
Please rewrite this line
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class))
on method
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic)
CodePudding user response:
MethodKafkaListenerEndpoint
is not intended to be used this way; it is for @KafkaListener
methods.
Do not use an endpoint at all, simply create a listener container and add your listener to its container properties.