I have Flink (task manager and job manager) and Kafka running as docker images on my mac.
I have created a Flink job and deployed it.
The job uses FlinkKafkaConsumer
and FlinkKafkaProducer
and should consume from kafka and produce back to kafka.
Looks like the "bootstrap.servers"
I use (kafka:9092
) has no meaning for Flink which fails with:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
docker ps results
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b0cb56cb1941 flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 8081/tcp taskmanager
0c29ca57a5bb flink:latest "/docker-entrypoint.…" 5 hours ago Up 5 hours 6123/tcp, 0.0.0.0:8081->8081/tcp jobmanager
c0e2a3ffc8e0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 29 hours ago Up 29 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
b9ae03f8f026 wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 29 hours 0.0.0.0:9092->9092/tcp kafka
docker network ls results
NETWORK ID NAME DRIVER SCOPE
1c09bfc0a2e9 bridge bridge local
268c4521f1de flink-network bridge local
2479a63017ab host host local
bb11245fba78 kafka_default bridge local
245dcc1e0d76 none null local
I also run:
docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' kafka
and used the IP I got as bootstrap.servers (172.18.0.2:9092
)
Update #1
I use a subset of docker-compose.yml (By Martijn Visser)
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8091:8091"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'
jobmanager:
image: flink:1.13.2-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.13.2-scala_2.12
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2f465a0a4129 confluentinc/cp-kafka-rest:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:8082->8082/tcp rest-proxy
eb25992c47d0 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 8081/tcp, 0.0.0.0:8091->8091/tcp schema-registry
1081319da296 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker
de9056ee250c flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 8081/tcp kafka_taskmanager_1
b38beefc35e3 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 30 minutes ago Up 30 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
e6db23fa8842 flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 30 minutes ago Up 30 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp kafka_jobmanager_1
What is the correct bootstrap.servers value to be used?
How do I make Flink "see" Kafka?
CodePudding user response:
Most likely you'll have to configure KAFKA_ADVERTISED_LISTENERS
and point Flink to the configured value. For example, in my Docker setup at https://github.com/MartijnVisser/flink-only-sql I have the following configuration in my Docker compose file:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
jobmanager:
image: flink:1.14.0-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.14.0-scala_2.12
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
I can then use broker:29092
as the Kafka bootstrap servers.