Home > Blockchain >  How do I get a Kafka message before the @KafkaListener method?
How do I get a Kafka message before the @KafkaListener method?

Time:10-24

My task is to get the Kafka message before the method with the @KafkaListner annotation, check the correlationId and requestId headers in it. If they're present, flush them to MDC or generate them otherwise.

And my question is how to get Kafka message with headers before method with the @KafkaListner?

CodePudding user response:

You can try to write your own ConsumerInterceptor following instructions from here.
Apache Kafka provides a mechanism to add interceptors to producers and consumers. These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans. However, you can manually wire in those dependencies using the interceptor config() method. The following Spring Boot application shows how to do this by overriding boot’s default factories to add some dependent bean into the configuration properties.

ConsumerFactory definition:

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
        Map<String, Object> consumerProperties = new HashMap<>();
        // consumerProperties.put(..., ...)
        // ...
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        consumerProperties.put("some.bean", someBean);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

Interceptor definition:

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private SomeBean bean;

    @Override
    public void configure(Map<String, ?> configs) {
        this.bean = (SomeBean) configs.get("some.bean");
    }

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        this.bean.someMethod("consumer interceptor");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

}

CodePudding user response:

Add a RecordInterceptor to the listener container (or factory that creates it).

  • Related