I was able to inspect particular topic for its partitions:
public void addPartitionIfNotExists(int partitionId){
Map<String, TopicDescription> games = kafkaAdmin.describeTopics("games");
TopicDescription gamesTopicDescription = games.get("games");
List<TopicPartitionInfo> partitionsInfo = gamesTopicDescription.partitions();
boolean partitionIdExists = partitionsInfo.stream().anyMatch(partitionInfo -> partitionInfo.partition() == partitionId);
if (!partitionIdExists){
//missing part
}
}
But I haven't been able to add new partition to a already existing topic during runtime. Don't know if that is even possible.
CodePudding user response:
See KafkaAdminOperations
Javadocs for more info:
/**
* Create topics if they don't exist or increase the number of partitions if needed.
* @param topics the topics.
*/
void createOrModifyTopics(NewTopic... topics);
Not sure in your logic around partitionIdExists
though, since the partition in the Kafka topic is just an index number. So, if there is partition 3
, it doesn't mean that there is no partitions 1
or 2
. Therefore a NewTopic
API is just that simple as numPartitions
. Nothing more.
Technically, what you are asking is just covered by that createOrModifyTopics()
and that's it: you don't need to check for topics yourself.