Home > front end >  Passing headers From Kafka ConsumerRecord to Kafka ProducerRecord / RestTemplate / Feign
Passing headers From Kafka ConsumerRecord to Kafka ProducerRecord / RestTemplate / Feign

Time:10-08

We are planning to use custom headers for testing purposes which should be passed across our microservices. Our application use Java/Kotlin with Spring Boot.

Thanks to RequestContextHolder we can easily get headers from HTTP-requests (Servlets) and pass them wherever we want. For example, we add them in every Feign-request:

class CustomRequestInterceptor : RequestInterceptor {

    override fun apply(template: RequestTemplate) {

        val requestAttributes = RequestContextHolder.getRequestAttributes() as ServletRequestAttributes?

        requestAttributes?.request?.let { request ->
            request.headerNames.toList()
                .forEach { name -> template.header(name, request.getHeader(name)) }
        }
    }
}

For Kafka Producers there's ProducerInterceptor.

But is there a way to catch ConsumerRecord's Headers and pass them further?

We planned to use ThreadLocal variables (like RequestContextHolder does), but we didn't find a way to set and unset Headers in this variable.

For example, we can pass the Headers in RecordInterceptor, but we can't clear our variable when the thread finishes processing current record, so next time we use this thread we can read the wrong headers (ThreadLocal variable will not be cleared).

Are there any interfaces/instruments which can help us? Thanks in advance!

CodePudding user response:

Version 2.7 added success and failure methods to the interceptor.

The upcoming version 2.8 adds afterRecord (which is called after the ErrorHandler).

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java#L50-L93

Another alternative is to add a MethodInterceptor to the container's advice chain.

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java#L847-L852

  • Related