My task is to get the Kafka message before the method with the @KafkaListner annotation, check the correlationId and requestId headers in it. If they're present, flush them to MDC or generate them otherwise.
And my question is how to get Kafka message with headers before method with the @KafkaListner?
CodePudding user response:
You can try to write your own ConsumerInterceptor
following instructions from here.
Apache Kafka provides a mechanism to add interceptors to producers and consumers. These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans. However, you can manually wire in those dependencies using the interceptor config() method. The following Spring Boot application shows how to do this by overriding boot’s default factories to add some dependent bean into the configuration properties.
ConsumerFactory definition:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
Interceptor definition:
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
CodePudding user response:
Add a RecordInterceptor
to the listener container (or factory that creates it).