Home > database >  Cannot connect to the Kafka docker instance
Cannot connect to the Kafka docker instance

Time:05-31

I am using bitnami/kafka:latest and bitnami/zookeper:latest images.

My sample project in Github: https://github.com/KostasD21/kafka-docker-demo

if you run the command docker-compose up and then run the application, you will see that the Spring Boot application cannot connect to the kafka container (which is currently running under the host kafka:9092)

The stacktrace from the application:

2022-05-31 08:59:18.255  WARN 144414 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-foo-1, groupId=foo] Error connecting to node kafka:9092 (id: 1001 rack: null)

java.net.UnknownHostException: kafka
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367) ~[na:na]
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301) ~[na:na]
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247) ~[spring-kafka-2.8.4.jar:2.8.4]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

logs from the docker container:

2022-05-31 05:51:24,236] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently active brokers in the cluster: Set(1001) (kafka.controller.KafkaController)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Current list of topics in the cluster: Set(__consumer_offsets, test-topic) (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Fetching topic deletions in progress (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics to be deleted:  (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics ineligible for deletion:  (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] Initializing topic deletion manager (kafka.controller.KafkaController)[2022-05-31 05:51:24,242] INFO [Topic Deletion Manager 1001] Initializing manager with initial deletions: Set(), initial ineligible deletions: Set() (kafka.controller.TopicDeletionManager)[2022-05-31 05:51:24,242] INFO [Controller id=1001] Sending update metadata request (kafka.controller.KafkaController)[2022-05-31 05:51:24,243] INFO [ExpirationReaper--1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2022-05-31 05:51:24,244] INFO [Controller id=1001 epoch=3] Sending UpdateMetadata request to brokers Set(1001) for 0 partitions (state.change.logger)[2022-05-31 05:51:24,249] INFO [ReplicaStateMachine controllerId=1001] Initializing replica state (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,252] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,253] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,258] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)

Currently I am stuck on this communication. The problem has to do with docker networking probably but I am not an expert on this aspect.

CodePudding user response:

Check this answer: https://stackoverflow.com/a/51634499/19059974

From the point of view of your application, Kafka is listening in localhost (kafka hostname would be if it is running in within the same docker network)

Essentially, you need to add a protocol mapper and a listener that announces that kafka is reachable at localhost (see previous answer), change the port where your kafka container binds to 29092 (in your docker-compose file) and have your app connect to localhost:29092

  • Related