Home > database >  why Flink kafka client is trying to connect to localhost:9092 while It is set up to connect to 172.1
why Flink kafka client is trying to connect to localhost:9092 while It is set up to connect to 172.1

Time:12-09

I am trying to set up a flink jobmanager-taskmanager with docker-compose with this config:

version: "3.7"
services:
  jobmanagerconfig:
    image: flink:1.13.2-scala_2.12
    expose:
      - "6133"
      - "6123"
    ports:
      - "8085:8081"
    command: standalone-job --job-classname net.mongerbot.configManager.App
    volumes:
      - ./usrlib/:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanagerconfig
        parallelism.default: 2
        taskmanager.numberOfTaskSlots: 4
      - KAFKA_URI=${KAFKA_URI}
      - KAFKA_PORT=${KAFKA_PORT}
      - KAFKA_groupId=${KAFKA_groupId}
  taskmanagerconfig:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanagerconfig
    links:
      - jobmanagerconfig
    command: taskmanager
#    scale: 1
    volumes:
      - ./usrlib/:/opt/flink/usrlib
    environment:
      - KAFKA_URI=${KAFKA_URI}
      - KAFKA_PORT=${KAFKA_PORT}
      - KAFKA_groupId=${KAFKA_groupId}
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanagerconfig
        parallelism.default: 2
        taskmanager.numberOfTaskSlots: 4
volumes:
  usrlib:

networks:
  default:
    external:
      name: mongerbot_network

The environment variables have the correct value in both containers. and as the log says the configured kafka client is set up to connect to 172.17.0.1:9092 as well:

docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,065 INFO  org.apache.kafka.clients.consumer.ConsumerConfig             [] - ConsumerConfig values: 
docker-taskmanagerconfig-1  |   allow.auto.create.topics = true
docker-taskmanagerconfig-1  |   auto.commit.interval.ms = 5000
docker-taskmanagerconfig-1  |   auto.offset.reset = latest
docker-taskmanagerconfig-1  |   bootstrap.servers = [172.17.0.1:9092]
docker-taskmanagerconfig-1  |   check.crcs = true
docker-taskmanagerconfig-1  |   client.dns.lookup = default
docker-taskmanagerconfig-1  |   client.id = 
docker-taskmanagerconfig-1  |   client.rack = 
docker-taskmanagerconfig-1  |   connections.max.idle.ms = 540000
docker-taskmanagerconfig-1  |   default.api.timeout.ms = 60000
docker-taskmanagerconfig-1  |   enable.auto.commit = true
docker-taskmanagerconfig-1  |   exclude.internal.topics = true
...

but this is the next lines of logs exactly after the kafka client log:

docker-taskmanagerconfig-1  |   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
docker-taskmanagerconfig-1  | 
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,084 INFO  org.apache.kafka.clients.consumer.KafkaConsumer              [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Subscribed to partition(s): config.subscribe-0, config.subscribe-2
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,090 INFO  org.apache.kafka.clients.Metadata                            [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,091 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,095 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,095 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1670492216094
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,096 INFO  org.apache.kafka.clients.consumer.KafkaConsumer              [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Subscribed to partition(s): config.subscribe-1
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,101 INFO  org.apache.kafka.clients.Metadata                            [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,102 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,103 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,104 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,197 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,207 WARN  org.apache.kafka.clients.NetworkClient 

and as you can see it is trying to connect to localhost:9092.

CodePudding user response:

There's actually no problem to what you're doing, as it's indicated in the logs:

Discovered group coordinator localhost:9092

So it's able to connect successfully. Now why you see 172.17.0.1 in the first place, and then localhost inside your kafka client logs? Well, localhost is just because of the env you passed to the application runtime. And the IP, that's because the raw name localhost which you provided in the configurations, needs to get resolved into some IP address, and you're not running it on your own machine natively, you're using docker. And eventually, 172.17.0.1 happens to be the docker host of the docker daemon of your machine. You can verify this in many ways, I'll link a post here to read more.

CodePudding user response:

this problem is not related to flink or kafka consumer. it was related to kafka server itself. the server should be configured to accept from 172.17.0.1 but it was setup to accept incoming request for kafka and localhost:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

I changed PLAINTEXT_HOST://localhost:9092 to PLAINTEXT_HOST://172.17.0.1:9092 and it fixed. (it was confusing because other clients (conduktor) could connect to the kafka with 172.17.0.1:9092 address even though this address was not in KAFKA_ADVERTISED_LISTENERS

  • Related