I have to define topics in my application. I am using Spring Cloud Stream to configure messaging between application. Spring cloud stream uses kafka binder in the background. I wonder what is the best way to create topics with partitions and replication factor.I wonder if we should configure it itself, or we should make it using application.properties.? What about if partition number can be a dynamic value that depends on consumer number. Please give me hints to implement in correct way.
CodePudding user response:
You should be able to define @Bean NewTopic
resources in your application configuration, either parsed from properties, or in code.
The partition count cannot be dynamic. You should know how many consumers are necessary ahead of time for absolute ordering or for max parallelism. Partition count can only ever increase, but doing so will lose ordering within the topic
CodePudding user response:
Kafka binder provides a topic provisioner as part of its implementation (KafkaTopicProvisioner
). Therefore, if your broker allows automatic topic creation, the desired topic with the correct configurations can be created using the provisioner before any bindings take effect. If the topic does not already exist, the provisioner creates the topic for you. If you have a producer application, see this blog for a detailed explanation on the various configuration available for setting up partitions.
If you have a consumer application, then if the topic does not exist when the app starts, it creates the topic with instanceCount * concurrency
. See this implementation detail for that. If the topic exists, then it can still increase the number of partitions on the broker based on the instanceCount
and concurrency
settings, but it cannot decrease the number of partitions.
You can set the replication factor using the property - spring.cloud.stream.kafka.bindings.<input-binding>.consumer.topic.replicationFactor
for consumer apps and spring.cloud.stream.kafka.bindings.<ouptut-binding>.producer.topic.replicationFactor
for producer apps.
You can also provide replicaAssignments using the same property prefixes.
In addition, you can pass along any arbitrary topic configuration using
spring.cloud.stream.kafka.bindings.<input-binding>.consumer.topic.properties
or spring.cloud.stream.kafka.bindings.<output-binding>.producer.topic.properties