Home > other >  Problem reading kafka headers in a RecordInterceptor after upgrading to spring-kafka 2.8
Problem reading kafka headers in a RecordInterceptor after upgrading to spring-kafka 2.8

Time:01-25

I have an application using spring boot & spring kafka that gets the delivery attempt header in a record interceptor so that I can include it in log messages. It has been working well until I upgraded to spring boot 2.6.3 and spring kafka 2.8.2 (from 2.5.5/2.7.7)

Now when I try to read the delivery attempt header it is not available. If I try and do the exact same thing within a message listener then it works fine so the header is clearly there.

This is what a simplified record interceptor and the listener container factory look like:

        @Bean
        public RecordInterceptor<Object, Object> recordInterceptor() {
            return record -> {
                int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
                log.info("delivery "   delivery);
                return record;
            };
        }


        @Bean
        public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory) {

            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.getContainerProperties().setDeliveryAttemptHeader(true);
            factory.setRecordInterceptor(recordInterceptor());

            return factory;
        }

I can't see anything in the spring docs suggesting the behaviour should have changed. Any ideas?

CodePudding user response:

This is a bug; I opened an issue. https://github.com/spring-projects/spring-kafka/issues/2082

  •  Tags:  
  • Related