I wanted to try out tracing between a spring boot 3 Kafka producer and consumer. I followed the examples from https://spring.io/blog/2022/10/12/observability-with-spring-boot-3 The traceID is added automatically when I make an API call using resttemplate between the producer and consumer, but the Kafka messages do not carry them. I used the following dependencies
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.0</version>
</dependency>
Kafka producer configuration
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
//typical properties
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory);
stringStringKafkaTemplate.setObservationEnabled(true);//trying out with
return stringStringKafkaTemplate;
}
Sending message
kafkaTemplate.send("topic-1" , "message");
On the consumer side :
@KafkaListener(topics = "topic-1", groupId = "group1")
public void listenGroupFoo(String message) {
logger.info("Received Message in group foo: " message);
}
When I check the kafka messages header, there is traceparent header for each message,but I dont see the traceId and spanId in the consumer logs
CodePudding user response:
Spring for Apache Kafka does not enable tracing by default; see
https://docs.spring.io/spring-kafka/docs/current/reference/html/#observation
Micrometer Observation
Using Micrometer for observation is now supported, since version 3.0, for the KafkaTemplate and listener containers.
Set
observationEnabled
on each component to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.
CodePudding user response:
I was able to make it working by enabling the observation for consumer factory as well. Change
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
//The following piece of code did the trick
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(consumerFactory);
return factory;
}