Home > Software design >  Spring boot Kafka Consumer Bootstrap Servers always picking Localhost 9092
Spring boot Kafka Consumer Bootstrap Servers always picking Localhost 9092

Time:10-07

I have created a Spring boot Application for Kafka Producer and Consumer. Both uses the same properties and bootstrap servers. While the Producer picks up the correct bootstrap server names, the consumer always picks up localhost:9092 as the bootstrap servers.

Consumer Config class:

@Configuration
    @RequiredArgsConstructor
    @Slf4j
    @EnableKafka
    public class KafkaConsumerConfig {
        private final KafkaConfig kafkaConfig;
         Map<String, Object> consumerProperties() {
            log.info("INSIDE consumerProperties:");
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
            log.info("ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:" props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getConsumerGroupId());
            props.put("ssl.truststore.location", kafkaConfig.getSslTrustStoreLocation());
            props.put("ssl.truststore.password", kafkaConfig.getSslTrustStorePassword());
            props.put("ssl.keystore.location", kafkaConfig.getSslTrustStoreLocation());
            props.put("ssl.keystore.password", kafkaConfig.getSslTrustStorePassword());
            props.put("ssl.key.location", kafkaConfig.getSslTrustStoreLocation());
            props.put("ssl.key.password", kafkaConfig.getSslTrustStorePassword());
            props.put("security.protocol", kafkaConfig.getSecurityProtocol());
            props.put("ssl.endpoint.identification.algorithm", "");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfig.getConsumerAutoOffsetReset());
            log.info("props:" props);
            return props;
        }
        @Bean
        public ConsumerFactory<String, String> consumerFactory()
        {
            return new DefaultKafkaConsumerFactory<>(consumerProperties());
        }
        @Bean
        public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory()
        {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setIdleBetweenPolls(10000);
            return factory;
        }
    }

KafkaConfig class

@ConfigurationProperties(prefix = "proj.kafka")
@Getter
@Setter
public class KafkaConfig {
  private String schemaRegistryUrl;
  private String bootstrapServers;
  private String securityProtocol;
  private String sslTrustStoreLocation;
  private String sslTrustStorePassword;
  private String consumerGroupId;
  private String consumerEnableAutoCommit;
  private String consumerAutoOffsetReset;
  private String consumerSessionTimeoutMs;
  private String producerRetries;
  private long consumerRetries;
  private long consumerBackoff;
  private String producerMaxInflightConnections;
  private String primaryCluster;
  private String secondaryCluster;
  private long delayMs;
  private long deleteMappingDelay;
  private String groupId;
}

KafkaConsumerService class

@Slf4j
@RequiredArgsConstructor
@Service
public class KafkaConsumerService {
    @Value("${proj.kafka.topic}")
    private String topicName;

    @KafkaListener(topics = "${proj.kafka.topic}", groupId = "${proj.kafka.consumer-group-id}")
    public void consumeMessage(ConsumerRecord<String, String> message) throws InterruptedException {
        log.info("INSIDE consumeMessage:");
        log.info("Tópic:", topicName);
        log.info("Headers:", message.headers());
        log.info("Partion:", message.partition());
        log.info("key:", message.key());
        log.info("Order:", message.value())
        }
}

Logs

2022-10-06 18:10:12.969  INFO 34048 --- [           main] c.v.a.v.e.config.KafkaConsumerConfig     : INSIDE consumerFactory:
2022-10-06 18:10:12.969  INFO 34048 --- [           main] c.v.a.v.e.config.KafkaConsumerConfig     : INSIDE consumerProperties:
2022-10-06 18:10:12.970  INFO 34048 --- [           main] c.v.a.v.e.config.KafkaConsumerConfig     : 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG:SSL://server1.com:9092,SSL://server2.com:9092,SSL://server3.com:9092

2022-10-06 18:10:14.500  INFO 34048 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-proj-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false

application.properties

proj.kafka.bootstrap-servers=SSL://server1.com:9092,SSL://server2.com:9092,SSL://server3.com:9092

I have also tried adding the below admin code in Consumer Config class, but it was still pointing to the localhost

 @Value(value = "${proj.kafka.bootstrap-servers}")
    private String bootstrapAddress;
        @Bean
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
            return new KafkaAdmin(configs);
        }

I also modified the below prop, but it was still pointing to the localhost

props.put("spring.kafka.bootstrap-servers", kafkaConfig.getBootstrapServers());

As i have mentioned, i am using the same properties and config setting for Producer, which is pointing to the correct brokers. Please see below producer log:

2022-10-06 21:09:33.949  INFO 9 --- [nio-8443-exec-3] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
acks = 1
batch.size = 16384
bootstrap.servers = [SSL://server1.com:9092, SSL://server2.com:9092, SSL://server3.com:9092]

Please suggest on how to point the Consumer to the correct bootstrap servers

CodePudding user response:

My issue got resolved by making following changes:

  1. Replacing the method name from concurrentKafkaListenerContainerFactory to kafkaListenerContainerFactory
  2. Adding below 2 properties: props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

Now the bootstrap server names are getting picked up correctly, but the consumer is not reading any messages although the KafkaConsumerService class is getting called each time any message is being produced.

Please share if anyone has any suggestion on this

CodePudding user response:

How do you know that KafkaConsumerService is called when each message is produced? Can you add KafkaConsumerService?

  • Related