I am trying to set up a flink jobmanager-taskmanager with docker-compose with this config:
version: "3.7"
services:
jobmanagerconfig:
image: flink:1.13.2-scala_2.12
expose:
- "6133"
- "6123"
ports:
- "8085:8081"
command: standalone-job --job-classname net.mongerbot.configManager.App
volumes:
- ./usrlib/:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanagerconfig
parallelism.default: 2
taskmanager.numberOfTaskSlots: 4
- KAFKA_URI=${KAFKA_URI}
- KAFKA_PORT=${KAFKA_PORT}
- KAFKA_groupId=${KAFKA_groupId}
taskmanagerconfig:
image: flink:1.13.2-scala_2.12
depends_on:
- jobmanagerconfig
links:
- jobmanagerconfig
command: taskmanager
# scale: 1
volumes:
- ./usrlib/:/opt/flink/usrlib
environment:
- KAFKA_URI=${KAFKA_URI}
- KAFKA_PORT=${KAFKA_PORT}
- KAFKA_groupId=${KAFKA_groupId}
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanagerconfig
parallelism.default: 2
taskmanager.numberOfTaskSlots: 4
volumes:
usrlib:
networks:
default:
external:
name: mongerbot_network
The environment variables have the correct value in both containers. and as the log says the configured kafka client is set up to connect to 172.17.0.1:9092 as well:
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,065 INFO org.apache.kafka.clients.consumer.ConsumerConfig [] - ConsumerConfig values:
docker-taskmanagerconfig-1 | allow.auto.create.topics = true
docker-taskmanagerconfig-1 | auto.commit.interval.ms = 5000
docker-taskmanagerconfig-1 | auto.offset.reset = latest
docker-taskmanagerconfig-1 | bootstrap.servers = [172.17.0.1:9092]
docker-taskmanagerconfig-1 | check.crcs = true
docker-taskmanagerconfig-1 | client.dns.lookup = default
docker-taskmanagerconfig-1 | client.id =
docker-taskmanagerconfig-1 | client.rack =
docker-taskmanagerconfig-1 | connections.max.idle.ms = 540000
docker-taskmanagerconfig-1 | default.api.timeout.ms = 60000
docker-taskmanagerconfig-1 | enable.auto.commit = true
docker-taskmanagerconfig-1 | exclude.internal.topics = true
...
but this is the next lines of logs exactly after the kafka client log:
docker-taskmanagerconfig-1 | value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
docker-taskmanagerconfig-1 |
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,084 INFO org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Subscribed to partition(s): config.subscribe-0, config.subscribe-2
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,090 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,091 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,094 WARN org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,094 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,094 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,095 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,095 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1670492216094
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,096 INFO org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Subscribed to partition(s): config.subscribe-1
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,101 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,102 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,103 WARN org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,104 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,197 WARN org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1 | 2022-12-08 09:36:56,207 WARN org.apache.kafka.clients.NetworkClient
and as you can see it is trying to connect to localhost:9092.
CodePudding user response:
There's actually no problem to what you're doing, as it's indicated in the logs:
Discovered group coordinator localhost:9092
So it's able to connect successfully. Now why you see 172.17.0.1
in the first place, and then localhost
inside your kafka client logs? Well, localhost is just because of the env you passed to the application runtime. And the IP, that's because the raw name localhost
which you provided in the configurations, needs to get resolved into some IP address, and you're not running it on your own machine natively, you're using docker. And eventually, 172.17.0.1
happens to be the docker host of the docker daemon of your machine. You can verify this in many ways, I'll link a post here to read more.
CodePudding user response:
this problem is not related to flink or kafka consumer. it was related to kafka server itself. the server should be configured to accept from 172.17.0.1 but it was setup to accept incoming request for kafka and localhost:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
I changed PLAINTEXT_HOST://localhost:9092
to PLAINTEXT_HOST://172.17.0.1:9092
and it fixed.
(it was confusing because other clients (conduktor) could connect to the kafka with 172.17.0.1:9092
address even though this address was not in KAFKA_ADVERTISED_LISTENERS