I'm developing a SpringBoot application which exposes the APIs to sub/unsub the Kafka topics. All we need to do is to pass the topic-name
in the API call and the application will subscribe to it and consume messages.
Subscribe topic API :
{FQDN}/sub/{topic-name}
Unsubscribe topic API :
{FQDN}/unsub/{topic-name}
Dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
I have created KafkaConsumerConfiguration
in which stated some beans (as follows).
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
@Bean
public Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
and I have used the ConcurrentMessageListenerContainer.start()
method to when the subscribe API is called against an topic-id
.
@Service
public class KafkaConsumerService {
// This map will be used to store the running consumers info.
// So that when we need to stop/unsub the topic, then we can get the container from this map
private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>();
@Autowired
private ConsumerFactory<String, String> kafkaConsumerFactory;
public void createConsumers(String topic,MessageListener messageListener) {
log.info("creating kafka consumer for topic {}", topic);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setPollTimeout(100);
ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps);
container.setupMessageListener(messageListener);
container.start();
consumersMap.put(topic, container);
log.info("created and started kafka consumer for topic {}", topic);
}
public void stopConsumer(String topic) {
log.info("stopping consumer for topic {}", topic);
ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
if (container != null) {
container.stop();
log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions");
}
}
}
This solution is working absolutely fine. Like,
- This is sub/unsubscribing the kafka topics using the APIs
- Also receiving the messages properly.
But the issue that ,
Every time when I call the API to subscribe the topic , it creates the AdminClient.create(props)
(line 336 of below image)
This generates the logs as below ,
2022-04-26 01:31:11.676 INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1650918672031
I don't want the AdminClient to create this instance each time.Because in this way, it take round 2 seconds to create a subscription to the topic. Which is unacceptable in my use-case.
Required Solution:
The kafka connection will be created only once. Then I can use that connection sub/unub the kafka topics. So that each time it doesn't create this connection and also the time efficient improved.
thanks.
CodePudding user response:
2.3.x has been out of support for a long time now
https://spring.io/projects/spring-kafka#support
The last 2.3.x version was 2.3.14 last July.
The admin is used to check if the topic exists; with that old version, controlled by the missingTopicsFatal
property in ContainerProperties
; it is true in that version.
With modern versions (since 2.3.4), it is false, so an AdminClient will not be created when you start the container.
But you really need to upgrade to a supported version (2.8.5 advised - 2.7.x goes out of OSS support soon).
There is no support to reuse a consumer with different (topics); a new consumer will be created each time you start the container.