Home > Software engineering >  Spring application cannot connect to a Kafka broker
Spring application cannot connect to a Kafka broker

Time:06-15


I am trying to build my first application using Kafka as a messaging system, but I have some problems with running it. I am using Docker images of zookeeper and Kafka from wurstmeister with this docker-compose.yml:

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    restart: unless-stopped
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "toAll:1:1"
    restart: unless-stopped

docker-compose up gives me following output:

kafka_1      | [2022-06-13 22:08:10,071] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,073] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,074] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,076] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1      | [2022-06-13 22:08:10,089] INFO Log directory /kafka/kafka-logs-44d30bad178c not found, creating it. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,121] INFO Loading logs from log dirs ArraySeq(/kafka/kafka-logs-44d30bad178c) (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,123] INFO Attempting recovery for all logs in /kafka/kafka-logs-44d30bad178c since no clean shutdown file was found (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,128] INFO Loaded 0 logs in 7ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,128] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,130] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1      | [2022-06-13 22:08:10,429] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
kafka_1      | [2022-06-13 22:08:10,433] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1      | [2022-06-13 22:08:10,465] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,489] INFO [broker-1049-to-controller-send-thread]: Starting (kafka.server.BrokerToControllerRequestThread)
kafka_1      | [2022-06-13 22:08:10,504] INFO [ExpirationReaper-1049-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,505] INFO [ExpirationReaper-1049-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,518] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1      | [2022-06-13 22:08:10,552] INFO Creating /brokers/ids/1049 (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,567] INFO Stat of the created znode at /brokers/ids/1049 is: 1191,1191,1655158090560,1655158090560,1,0,0,72057715199770624,202,0,1191
kafka_1      |  (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,567] INFO Registered broker 1049 at path /brokers/ids/1049 with addresses: PLAINTEXT://127.0.0.1:9092, czxid (broker epoch): 1191 (kafka.zk.KafkaZkClient)
kafka_1      | [2022-06-13 22:08:10,623] INFO [ExpirationReaper-1049-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,627] INFO [ExpirationReaper-1049-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,628] INFO [ExpirationReaper-1049-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,640] INFO [GroupCoordinator 1049]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-06-13 22:08:10,644] INFO [GroupCoordinator 1049]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-06-13 22:08:10,664] INFO [ProducerId Manager 1049]: Acquired new producerId block (brokerId:1049,blockStartProducerId:25000,blockEndProducerId:25999) by writing to Zk with path version 26 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1      | [2022-06-13 22:08:10,665] INFO [TransactionCoordinator id=1049] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2022-06-13 22:08:10,668] INFO [TransactionCoordinator id=1049] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2022-06-13 22:08:10,668] INFO [Transaction Marker Channel Manager 1049]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1      | [2022-06-13 22:08:10,688] INFO [ExpirationReaper-1049-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2022-06-13 22:08:10,704] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
kafka_1      | [2022-06-13 22:08:10,722] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Starting socket server acceptors and processors (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started socket server acceptors and processors (kafka.network.SocketServer)
kafka_1      | [2022-06-13 22:08:10,739] INFO Kafka version: 2.8.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,739] INFO Kafka commitId: 839b886f9b732b15 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,740] INFO Kafka startTimeMs: 1655158090735 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2022-06-13 22:08:10,744] INFO [KafkaServer id=1049] started (kafka.server.KafkaServer)
zookeeper_1  | 2022-06-13 22:08:10,770 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001c35cf0000 type:multi cxid:0x3b zxid:0x4ab txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1      | [2022-06-13 22:08:10,798] INFO [broker-1049-to-controller-send-thread]: Recorded new controller, from now on will use broker 127.0.0.1:9092 (id: 1049 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka_1      | creating topics: toAll:1:1
zookeeper_1  | 2022-06-13 22:08:19,653 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,655 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,659 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@694] - Established session 0x100001c35cf0001 with negotiated timeout 30000 for client /172.20.0.3:60284
zookeeper_1  | 2022-06-13 22:08:19,803 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x100001c35cf0001
zookeeper_1  | 2022-06-13 22:08:19,808 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /172.20.0.3:60284 which had sessionid 0x100001c35cf0001 

So, as those said: from now on will use broker 127.0.0.1:9092 and creating topics: toAll:1:1 i am guessing it should be ready to run my application, which is:

@SpringBootApplication
@Slf4j
public class KafkaTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaTestApplication.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("toAll").build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            String mess = "Test message";
            template.send("toAll", mess);
            log.info(String.format("Message sent: %s", mess));
        };
    }
}

But when I run it, I get this output:

2022-06-14 00:25:25.148  INFO 1108 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2022-06-14 00:25:25.254  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-06-14 00:25:25.255  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-06-14 00:25:25.255  INFO 1108 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1655159125253
2022-06-14 00:25:27.299  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:27.301  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:29.454  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:29.454  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:31.724  INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:31.724  WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

and last two lines repeats for a while to finally throw an exception.

I am fighting with this issue for two days, trying some other setups for Kafka, like kafka_listeners, kafka_advertised_listeners and so on... Some of them make Kafka not running properly, some change IPs, but non of them make my app gives different output. Do you some ideas what can cause this problem and how might it be fixed?

Thanks a lot in advance.

CodePudding user response:

Look at the output of docker ps

If you don't see 0.0.0.0:9092->9092/tcp, then you've not forwarded the host to the necessary KAFKA_ADVERTISED_PORT

Setting this way in compose makes Docker pick a random host port to map 9092 to inside the container, and therefore localhost:9092 isn't a valid open port on your host.

 ports:
      - "9092"

Related Connect to Kafka running in Docker

  • Related