Small question regarding Kafka concurrency with Spring Kafka please.
I have one Kafka topic theimportanttopic
where many messages are being sent over it.
Hard fact, this Kafka topic has three partitions. (Calling them theimportanttopic-0 theimportanttopic-1 theimportanttopic-2)
It is known Kafka does not allow multiple consumers from one same group to consume messages from one same partition. I.e, no two consumers within one same group can consume from theimportanttopic-0.
My Spring Kafka application code is as follow:
@Configuration
class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafka.com:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1); //HERE
return factory;
}
}
@Component
class KafkaListenersExample {
Logger LOG = LoggerFactory.getLogger(KafkaListenersExample.class);
@KafkaListener(topics = "theimportanttopic", groupId = "uniquegroup")
void listener(String data) {
LOG.info(data);
doSomethingImportantWithTheData(data);
}
}
With that, I am having a hard time understanding the difference between the two constructs:
Suppose this application is already dockerized and a cloud environment is ready for use.
I can either give 1CPU 1G mem *3 for construct 1, or 3CPU 3G mem for construct 2.
Design number 1: this application, since it is in a container deployed on the cloud, like Kubernetes, spin up three instances of it. By definition, I will have three of those "apps" and each one of the app will consume from one out of the three partition.
kubectl get pods
my-app-AaAaAaAaAa-AaAaA
my-app-BbBbBbBbBb-BbBbB
my-app-CcCcCcCcCc-CcCcC
(and hypothetically, my-app-AaAaAaAaAa-AaAaA consumes theimportanttopic-0, my-app-BbBbBbBbBb-BbBbB theimportanttopic-1, my-app-CcCcCcCcCc-CcCcCtheimportanttopic-2)
Design number 2: On the other hand, I can have one, and only one of this app my-app in container, set the concurrency to 3. (same code as above, just one line change)
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
What are the differences between the two designs please?
Which one is preferred and why please?
This is not an opinion based question. May I know what is the performance, the cost, the pros and cons between design number 1 and design number 2 please?
Thank you
CodePudding user response:
The difference is high availability.
If you have any one pod, consuming all three partitions, and it stops, then you need additional config in k8s to have a RestartPolicy.
Alternatively, have a ReplicaSet with a maxContainers of 3, and then Kafka Consumer API can rebalance when any one of them starts/stops.
You can also look into KEDA to autoscale based on consumer lag.