I am trying to build my first application using Kafka as a messaging system, but I have some problems with running it. I am using Docker images of zookeeper and Kafka from wurstmeister with this docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
restart: unless-stopped
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "toAll:1:1"
restart: unless-stopped
docker-compose up gives me following output:
kafka_1 | [2022-06-13 22:08:10,071] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2022-06-13 22:08:10,073] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2022-06-13 22:08:10,074] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2022-06-13 22:08:10,076] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2022-06-13 22:08:10,089] INFO Log directory /kafka/kafka-logs-44d30bad178c not found, creating it. (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,121] INFO Loading logs from log dirs ArraySeq(/kafka/kafka-logs-44d30bad178c) (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,123] INFO Attempting recovery for all logs in /kafka/kafka-logs-44d30bad178c since no clean shutdown file was found (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,128] INFO Loaded 0 logs in 7ms. (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,128] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,130] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1 | [2022-06-13 22:08:10,429] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
kafka_1 | [2022-06-13 22:08:10,433] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1 | [2022-06-13 22:08:10,465] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1 | [2022-06-13 22:08:10,489] INFO [broker-1049-to-controller-send-thread]: Starting (kafka.server.BrokerToControllerRequestThread)
kafka_1 | [2022-06-13 22:08:10,504] INFO [ExpirationReaper-1049-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,505] INFO [ExpirationReaper-1049-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,506] INFO [ExpirationReaper-1049-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,518] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1 | [2022-06-13 22:08:10,552] INFO Creating /brokers/ids/1049 (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1 | [2022-06-13 22:08:10,567] INFO Stat of the created znode at /brokers/ids/1049 is: 1191,1191,1655158090560,1655158090560,1,0,0,72057715199770624,202,0,1191
kafka_1 | (kafka.zk.KafkaZkClient)
kafka_1 | [2022-06-13 22:08:10,567] INFO Registered broker 1049 at path /brokers/ids/1049 with addresses: PLAINTEXT://127.0.0.1:9092, czxid (broker epoch): 1191 (kafka.zk.KafkaZkClient)
kafka_1 | [2022-06-13 22:08:10,623] INFO [ExpirationReaper-1049-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,627] INFO [ExpirationReaper-1049-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,628] INFO [ExpirationReaper-1049-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,640] INFO [GroupCoordinator 1049]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1 | [2022-06-13 22:08:10,644] INFO [GroupCoordinator 1049]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1 | [2022-06-13 22:08:10,664] INFO [ProducerId Manager 1049]: Acquired new producerId block (brokerId:1049,blockStartProducerId:25000,blockEndProducerId:25999) by writing to Zk with path version 26 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1 | [2022-06-13 22:08:10,665] INFO [TransactionCoordinator id=1049] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1 | [2022-06-13 22:08:10,668] INFO [TransactionCoordinator id=1049] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1 | [2022-06-13 22:08:10,668] INFO [Transaction Marker Channel Manager 1049]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1 | [2022-06-13 22:08:10,688] INFO [ExpirationReaper-1049-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2022-06-13 22:08:10,704] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
kafka_1 | [2022-06-13 22:08:10,722] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Starting socket server acceptors and processors (kafka.network.SocketServer)
kafka_1 | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1 | [2022-06-13 22:08:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1049] Started socket server acceptors and processors (kafka.network.SocketServer)
kafka_1 | [2022-06-13 22:08:10,739] INFO Kafka version: 2.8.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2022-06-13 22:08:10,739] INFO Kafka commitId: 839b886f9b732b15 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2022-06-13 22:08:10,740] INFO Kafka startTimeMs: 1655158090735 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2022-06-13 22:08:10,744] INFO [KafkaServer id=1049] started (kafka.server.KafkaServer)
zookeeper_1 | 2022-06-13 22:08:10,770 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001c35cf0000 type:multi cxid:0x3b zxid:0x4ab txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1 | [2022-06-13 22:08:10,798] INFO [broker-1049-to-controller-send-thread]: Recorded new controller, from now on will use broker 127.0.0.1:9092 (id: 1049 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka_1 | creating topics: toAll:1:1
zookeeper_1 | 2022-06-13 22:08:19,653 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /172.20.0.3:60284
zookeeper_1 | 2022-06-13 22:08:19,655 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /172.20.0.3:60284
zookeeper_1 | 2022-06-13 22:08:19,659 [myid:] - INFO [SyncThread:0:ZooKeeperServer@694] - Established session 0x100001c35cf0001 with negotiated timeout 30000 for client /172.20.0.3:60284
zookeeper_1 | 2022-06-13 22:08:19,803 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x100001c35cf0001
zookeeper_1 | 2022-06-13 22:08:19,808 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /172.20.0.3:60284 which had sessionid 0x100001c35cf0001
So, as those said: from now on will use broker 127.0.0.1:9092
and creating topics: toAll:1:1
i am guessing it should be ready to run my application, which is:
@SpringBootApplication
@Slf4j
public class KafkaTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTestApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("toAll").build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
String mess = "Test message";
template.send("toAll", mess);
log.info(String.format("Message sent: %s", mess));
};
}
}
But when I run it, I get this output:
2022-06-14 00:25:25.148 INFO 1108 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2022-06-14 00:25:25.254 INFO 1108 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.1
2022-06-14 00:25:25.255 INFO 1108 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 97671528ba54a138
2022-06-14 00:25:25.255 INFO 1108 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1655159125253
2022-06-14 00:25:27.299 INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:27.301 WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:29.454 INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:29.454 WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-06-14 00:25:31.724 INFO 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node -1 disconnected.
2022-06-14 00:25:31.724 WARN 1108 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
and last two lines repeats for a while to finally throw an exception.
I am fighting with this issue for two days, trying some other setups for Kafka, like kafka_listeners, kafka_advertised_listeners and so on... Some of them make Kafka not running properly, some change IPs, but non of them make my app gives different output. Do you some ideas what can cause this problem and how might it be fixed?
Thanks a lot in advance.
CodePudding user response:
Look at the output of docker ps
If you don't see 0.0.0.0:9092->9092/tcp
, then you've not forwarded the host to the necessary KAFKA_ADVERTISED_PORT
Setting this way in compose makes Docker pick a random host port to map 9092 to inside the container, and therefore localhost:9092
isn't a valid open port on your host.
ports:
- "9092"