Home > Mobile >  Spring-Kafka BatchInterceptor don't work as expected
Spring-Kafka BatchInterceptor don't work as expected

Time:10-27

I have spring-cloud-stream project that use kafka binder. Application consumes messages in batch mode. I need to filter consumed records by specific header. In this case i use BatchInterceptor:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
    BatchInterceptor<String, String> customInterceptor
) {
    return (((container, destinationName, group) -> {
        container.setBatchInterceptor(customInterceptor);
        log.info("Container customized");
    }));
}

@Bean
public BatchInterceptor<String, String> customInterceptor() {
    return (consumerRecords, consumer) -> {
        log.info("Origin records count: {}", consumerRecords.count());
        final Set<TopicPartition> partitions = consumerRecords.partitions();
        final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader
            = Stream.of(partitions).flatMap(Collection::stream)
            .collect(Collectors.toMap(
                Function.identity(),
                p -> Stream.ofNullable(consumerRecords.records(p))
                    .flatMap(Collection::stream)
                    .filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
                    .collect(Collectors.toList())
            ));
        var filteredRecords = new ConsumerRecords<>(filteredByHeader);
        log.info("Filtered count: {}", filteredRecords.count());
        return filteredRecords;
    };
}

Example code here batch interceptor example.

In logs i see, that the records are filtered successfully, but the filtered ones are still get into the consumer.

Why ButchInterceptor does not filter records? How can i filter ConsumerRecords by specific header in spring-cloud-stream with enabled batch mode? You can run the tests from the example to reproduce behaveor.

CodePudding user response:

You are using very old code (Boot 2.5.0) which is out of OSS support.

https://spring.io/projects/spring-boot#support

(Cloud too).

I tested your interceptor with current versions and it works fine.

Boot 2.7.5, cloud 2021.0.4:

@SpringBootApplication
public class So74203611Application {

    private static final Logger log = LoggerFactory.getLogger(So74203611Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So74203611Application.class, args);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
            BatchInterceptor<String, String> customInterceptor) {

        return (((container, destinationName, group) -> {
            container.setBatchInterceptor(customInterceptor);
            log.info("Container customized {}", destinationName);
        }));
    }

    @Bean
    public BatchInterceptor<String, String> customInterceptor() {
        return (consumerRecords, consumer) -> {
            log.info("Origin records count: {}", consumerRecords.count());
            final Set<TopicPartition> partitions = consumerRecords.partitions();
            final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader = Stream.of(partitions)
                    .flatMap(Collection::stream)
                    .collect(Collectors.toMap(Function.identity(),
                            p -> Stream.ofNullable(consumerRecords.records(p)).flatMap(Collection::stream)
                                    .filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
                                    .collect(Collectors.toList())));
            var filteredRecords = new ConsumerRecords<>(filteredByHeader);
            log.info("Filtered count: {}", filteredRecords.count());
            return filteredRecords;
        };
    }

    @Bean
    Consumer<List<String>> input() {
        return str -> {
            System.out.println(str);
        };
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            Headers headers = new RecordHeaders();
            headers.add("TEST", "foo".getBytes());
            ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "bar".getBytes(),
                    headers);
            template.send(rec);
            headers = new RecordHeaders();
            rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "baz".getBytes(), headers);
            template.send(rec);
            template.send(rec);
        };
    }

}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
[bar]
  • Related