I have a Spring Boot Kafka consumer with the below configuration . I was trying manual acknowledgment instead of auto commit . With manual acknowledgment I started getting error . Spring Boot version is 2.7.2.
kafka.consumer.groupId=mcs-ccp-event
message.topic.name=mcs_ccp_test
kafka.bootstrapAddress=kafka-dev-app-a1.com:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
and this consumer configurations
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.consumer.groupId}")
private String groupId;
public ConsumerFactory<String, Event> eventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.xxx.mcsccpkafkaconsumer.vo.Event");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Event.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> eventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(eventConsumerFactory());
return factory;
}
This is my listener
@KafkaListener(topics = "${message.topic.name}", containerFactory = "eventKafkaListenerContainerFactory", groupId = "${kafka.consumer.groupId}")
public void eventListener(@Payload Event event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment acknowledgment) {
log.info("Received event message: {} from partition : {}", event, partition);
persistEventToDB(event);
acknowledgment.acknowledge();
this.eventLatch.countDown();
}
Whenever consumer is receiving message from producer , its always throwing the error :
2022-08-06 11:16:11.749 ERROR 37700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff none exhausted for mcs__ccp-1@122
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2683) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2643) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2570) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2451) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2000) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) ~[spring-kafka-2.8.8.jar:2.8.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
... 11 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}], failedMessage=GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
... 15 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
... 14 common frames omitted
CodePudding user response:
You're setting the ack-mode and auto-offset-reset in the properties file, which is used by Spring Boot
's auto configuration to setup its own KafkaListenerContainerFactory
.
But since then you declare your own KafkaListenerContainerFactory
bean, auto configuration backs off, and your programatic configuration is used instead.
You can set the properties for your consumer factory directly in the properties file and let Spring Boot create the beans - then there's no need for this KafkaConsumerConfig
class.
Or you can set the ack mode and auto-offset-reset directly in the factory bean you're declaring instead of the properties file.