I am writing a little Kafka metrics exporter (Yes there are loads available like prometheus etc but I want a light weight custom one. Kindly excuse me on this).
As part of this I would like to know as soon as first message is received (or topic has messages) in a Kafka topic. I am using Spring Boot and Kafka.
I have the below code which gives the name of the topic and number of partitions. I want to know if the topic has messages? Kindly let me know how can I get this stat. Any lead is much appreciated!
@ReadOperation
public List<TopicManifest> kafkaTopic() throws ExecutionException, InterruptedException {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions);
Set<String> topics = listTopicsResult.names().get().stream().filter(topic -> !topic.startsWith("_")).collect(Collectors.toSet());
System.out.println(topics);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
Map<String, KafkaFuture<TopicDescription>> topicNameValues = describeTopicsResult.topicNameValues();
List<TopicManifest> topicManifests = topicNameValues.entrySet().stream().map(entry -> {
try {
TopicDescription topicDescription = entry.getValue().get();
return TopicManifest.builder().name(entry.getKey())
.noOfPartitions(topicDescription.partitions().size())
.build();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
return topicManifests;
}
CodePudding user response:
Create a KafkaConsumer
and call endOffsets
(the consumer does not need to be subscribed to the topic(s)).
@Bean
ApplicationRunner runner1(ConsumerFactory cf) {
return args -> {
try (Consumer consumer = cf.createConsumer()) {
System.out.println(consumer.endOffsets(List.of(new TopicPartition("ktest29", 0),
new TopicPartition("ktest29", 1),
new TopicPartition("ktest29", 2))));
}
};
}