I have spring-cloud-stream project that use kafka binder. Application consumes messages in batch mode. I need to filter consumed records by specific header. In this case i use BatchInterceptor:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor
) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized");
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader
= Stream.of(partitions).flatMap(Collection::stream)
.collect(Collectors.toMap(
Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p))
.flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())
));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
Example code here batch interceptor example.
In logs i see, that the records are filtered successfully, but the filtered ones are still get into the consumer.
Why ButchInterceptor does not filter records? How can i filter ConsumerRecords by specific header in spring-cloud-stream with enabled batch mode? You can run the tests from the example to reproduce behaveor.
CodePudding user response:
You are using very old code (Boot 2.5.0) which is out of OSS support.
https://spring.io/projects/spring-boot#support
(Cloud too).
I tested your interceptor with current versions and it works fine.
Boot 2.7.5, cloud 2021.0.4:
@SpringBootApplication
public class So74203611Application {
private static final Logger log = LoggerFactory.getLogger(So74203611Application.class);
public static void main(String[] args) {
SpringApplication.run(So74203611Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized {}", destinationName);
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader = Stream.of(partitions)
.flatMap(Collection::stream)
.collect(Collectors.toMap(Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p)).flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
@Bean
Consumer<List<String>> input() {
return str -> {
System.out.println(str);
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add("TEST", "foo".getBytes());
ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "bar".getBytes(),
headers);
template.send(rec);
headers = new RecordHeaders();
rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "baz".getBytes(), headers);
template.send(rec);
template.send(rec);
};
}
}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
[bar]