I have created a Spring boot Application for Kafka Producer and Consumer. Both uses the same properties and bootstrap servers. While the Producer picks up the correct bootstrap server names, the consumer always picks up localhost:9092 as the bootstrap servers.
Consumer Config class:
@Configuration
@RequiredArgsConstructor
@Slf4j
@EnableKafka
public class KafkaConsumerConfig {
private final KafkaConfig kafkaConfig;
Map<String, Object> consumerProperties() {
log.info("INSIDE consumerProperties:");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
log.info("ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:" props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getConsumerGroupId());
props.put("ssl.truststore.location", kafkaConfig.getSslTrustStoreLocation());
props.put("ssl.truststore.password", kafkaConfig.getSslTrustStorePassword());
props.put("ssl.keystore.location", kafkaConfig.getSslTrustStoreLocation());
props.put("ssl.keystore.password", kafkaConfig.getSslTrustStorePassword());
props.put("ssl.key.location", kafkaConfig.getSslTrustStoreLocation());
props.put("ssl.key.password", kafkaConfig.getSslTrustStorePassword());
props.put("security.protocol", kafkaConfig.getSecurityProtocol());
props.put("ssl.endpoint.identification.algorithm", "");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfig.getConsumerAutoOffsetReset());
log.info("props:" props);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory()
{
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleBetweenPolls(10000);
return factory;
}
}
KafkaConfig class
@ConfigurationProperties(prefix = "proj.kafka")
@Getter
@Setter
public class KafkaConfig {
private String schemaRegistryUrl;
private String bootstrapServers;
private String securityProtocol;
private String sslTrustStoreLocation;
private String sslTrustStorePassword;
private String consumerGroupId;
private String consumerEnableAutoCommit;
private String consumerAutoOffsetReset;
private String consumerSessionTimeoutMs;
private String producerRetries;
private long consumerRetries;
private long consumerBackoff;
private String producerMaxInflightConnections;
private String primaryCluster;
private String secondaryCluster;
private long delayMs;
private long deleteMappingDelay;
private String groupId;
}
KafkaConsumerService class
@Slf4j
@RequiredArgsConstructor
@Service
public class KafkaConsumerService {
@Value("${proj.kafka.topic}")
private String topicName;
@KafkaListener(topics = "${proj.kafka.topic}", groupId = "${proj.kafka.consumer-group-id}")
public void consumeMessage(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("INSIDE consumeMessage:");
log.info("Tópic:", topicName);
log.info("Headers:", message.headers());
log.info("Partion:", message.partition());
log.info("key:", message.key());
log.info("Order:", message.value())
}
}
Logs
2022-10-06 18:10:12.969 INFO 34048 --- [ main] c.v.a.v.e.config.KafkaConsumerConfig : INSIDE consumerFactory:
2022-10-06 18:10:12.969 INFO 34048 --- [ main] c.v.a.v.e.config.KafkaConsumerConfig : INSIDE consumerProperties:
2022-10-06 18:10:12.970 INFO 34048 --- [ main] c.v.a.v.e.config.KafkaConsumerConfig :
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:SSL://server1.com:9092,SSL://server2.com:9092,SSL://server3.com:9092
2022-10-06 18:10:14.500 INFO 34048 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-proj-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
application.properties
proj.kafka.bootstrap-servers=SSL://server1.com:9092,SSL://server2.com:9092,SSL://server3.com:9092
I have also tried adding the below admin code in Consumer Config class, but it was still pointing to the localhost
@Value(value = "${proj.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
return new KafkaAdmin(configs);
}
I also modified the below prop, but it was still pointing to the localhost
props.put("spring.kafka.bootstrap-servers", kafkaConfig.getBootstrapServers());
As i have mentioned, i am using the same properties and config setting for Producer, which is pointing to the correct brokers. Please see below producer log:
2022-10-06 21:09:33.949 INFO 9 --- [nio-8443-exec-3] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [SSL://server1.com:9092, SSL://server2.com:9092, SSL://server3.com:9092]
Please suggest on how to point the Consumer to the correct bootstrap servers
CodePudding user response:
My issue got resolved by making following changes:
- Replacing the method name from concurrentKafkaListenerContainerFactory to kafkaListenerContainerFactory
- Adding below 2 properties: props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Now the bootstrap server names are getting picked up correctly, but the consumer is not reading any messages although the KafkaConsumerService class is getting called each time any message is being produced.
Please share if anyone has any suggestion on this
CodePudding user response:
How do you know that KafkaConsumerService
is called when each message is produced? Can you add KafkaConsumerService
?