In the docs it is stated that after you set concurrency
on ConcurrentKafkaListenerContainerFactory you will get that many instances of KafkaMessageListenerContainer. Also, if you use RangeAssignor
strategy, you should be seing 10 idel KafkaMessageListenerContainer. I cannot seem to understand why is that? I was reading this and tested:
I have this in my producer-ms:
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("first_topic1")
.partitions(5)
.replicas(1)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("first_topic2")
.partitions(5)
.replicas(1)
.compact()
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("first_topic3")
.partitions(5)
.replicas(1)
.compact()
.build();
}
}
In receiver-ms:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setConcurrency(15);
return factory;
}
Strategy:
consumer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
auto-startup: true
properties:
partition:
assignment:
strategy: org.apache.kafka.clients.consumer.RangeAssignor
And finally I do get those 10 idle KafkaMessageListenerContainer
s:
2023-02-04T13:55:48.780 01:00 INFO 3960 --- [tainer#0-10-C-1] o.s.k.l.KafkaMessageListenerContainer : my-first-application: partitions assigned: [first_topic1-2, first_topic2-2, first_topic3-2]
2023-02-04T13:55:48.780 01:00 INFO 3960 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : my-first-application: partitions assigned: [first_topic1-0, first_topic2-0, first_topic3-0]
2023-02-04T13:55:48.780 01:00 INFO 3960 --- [tainer#0-11-C-1] o.s.k.l.KafkaMessageListenerContainer : my-first-application: partitions assigned: [first_topic1-3, first_topic2-3, first_topic3-3]
2023-02-04T13:55:48.780 01:00 INFO 3960 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer : my-first-application: partitions assigned: [first_topic1-1, first_topic2-1, first_topic3-1]
2023-02-04T13:55:48.780 01:00 INFO 3960 --- [tainer#0-12-C-1] o.s.k.l.KafkaMessageListenerContainer : my-first-application: partitions assigned: [first_topic1-4, first_topic2-4, first_topic3-4]
Cannot seem to understand Java docs for RangeAssignor. Like we had 3 topics, 5 partitions each topic. Total 15. In docs, they say 'We then divide the number of partitions by the total number of consumers'
. What is a consumer in this case? Is it my microservice which joins consumer group or is all fifteen KafkaMessageListenerContainer
s?
Then it says: 'If it does not evenly divide, then the first few consumers will have one extra partition'
, but this does divide evenly 15/3=5.
Would really like some help here.
Second thing that is related, why should we ever choose RangeAssignor over CooperativeStickyAssignor since with RangeAssignor always the first consumers are getting more partitions and it is eager strategy i.e. it entails stop-the-world event while CooperativeStickyAssignor does not?
CodePudding user response:
The first sentance of the documentation that you've linked states explicitelly:
The range assignor works on a per-topic basis.
That means, that each of the three 5-partition topics is divided betwean consumers separetly. Since you have 15 consumers, first 5 of them will receive one partition of each topic and the rest will stay idle.