Home > Net >  Error while implement AcknowledgingMessageListener<String, String>
Error while implement AcknowledgingMessageListener<String, String>

Time:06-08

I want to implement manual commit with ack mode MANUAL_IMMDEDIATE. I use AcknowledgingMessageListener class to make consumer. Its fine while application run but after consumer receive data then application got error but application still running. Error happen only after data received. Any suggest will be appreciate , Thanks


import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class MyCustomMessageListener extends CustomMessageListener {

    @Override
    @SneakyThrows
    public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
                createDefaultMethodKafkaListenerEndpoint(name, topic);
        kafkaListenerEndpoint.setBean(new MyMessageListener());
        kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class));
        return kafkaListenerEndpoint;
    }

    @Slf4j
    private static class MyMessageListener implements AcknowledgingMessageListener<String, String> {

        /**
         * Invoked with data from kafka.
         * @param acknowledgment ack 
         * @param data           the data to be processed.
         */
        @Override
        public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
            log.info("My message listener got a new record: "   data);
            acknowledgment.acknowledge();
            log.info("My message listener done processing record: "   data);
        }

    }

}

APPLICATION LOG

Caused by: java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
    at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 11 common frames omitted

2022-06-07 11:15:28.908  INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:28.909 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 9 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
    at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 11 common frames omitted

2022-06-07 11:15:29.450  INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:29.452 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 9 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
    at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 11 common frames omitted

2022-06-07 11:15:30.039  INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:30.041 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 9 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
    at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 11 common frames omitted

2022-06-07 11:15:30.710 ERROR 32664 --- [antopicPE-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for consumerpe-0@61

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
    at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
    at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
    ... 11 common frames omitted

CodePudding user response:

enter image description here

as per documentation says, dont use onMessage with ConsumerRecord, because this message listener is acknowledging, it need the acknowledgement to acknowledge

Please rewrite this line

kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class)) 

on method

public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic)

CodePudding user response:

MethodKafkaListenerEndpoint is not intended to be used this way; it is for @KafkaListener methods.

Do not use an endpoint at all, simply create a listener container and add your listener to its container properties.

  • Related