Home > Net >  How to set up airflow worker to allow webserver fetch logs on different machine with docker?
How to set up airflow worker to allow webserver fetch logs on different machine with docker?

Time:10-30

I just recently installed airflow 2.1.4 with docker containers, I've successfully set up the postgres, redis, scheduler, 2x local workers, and flower on the same machine with docker-compose.

Now I want to expand, and set up workers on other machines.

I was able to get the workers up and running, flower is able to find the worker node, the worker is receiving tasks from the scheduler correctly, but regardless of the result status of the task, the task would be marked as failed with error message like below:

*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-29T14:38:37.669734 00:00/1.log
*** Fetching from: http://b7a0154e7e20:8793/log/test/test/2021-10-29T14:38:37.669734 00:00/1.log
*** Failed to fetch log file from worker. [Errno -3] Temporary failure in name resolution

Then I tried replaced AIRFLOW__CORE__HOSTNAME_CALLABLE: 'socket.getfqdn' with AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'

I got this error instead:

*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-28T15:47:59.625675 00:00/1.log
*** Fetching from: http://172.18.0.2:8793/log/test/test/2021-10-28T15:47:59.625675 00:00/1.log
*** Failed to fetch log file from worker. [Errno 113] No route to host

Then I tried map the port 8793 of the worker with its host machine (in worker_4 below), now it's returning:

*** Failed to fetch log file from worker. [Errno 111] Connection refused

but sometimes still give "Temporary failure in name resolution" error.

I've also tried to copy the URL in the error, and change replace the IP with the host machine ip, and got this message:

Forbidden
You don't have the permission to access the requested resource. It is either read-protected or not readable by the server.

Please let me know if additional info is needed.

Thanks in advance!

Below is my docker-compose.yml for the scheduler/webserver/flower:

version: '3.4'

x-hosts: &extra_hosts
  postgres: XX.X.XX.XXX
  redis: XX.X.XX.XXX

x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.4}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
    AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
    AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-slack}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./assets:/opt/airflow/assets
    - ./airflow.cfg:/opt/airflow/airflow.cfg
    - /etc/hostname:/etc/hostname
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  extra_hosts: *extra_hosts


services:
  postgres:
    container_name: 'airflow-postgres'
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - ./data/postgres:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    ports:
      - '5432:5432'

  redis:
    image: redis:latest
    container_name: 'airflow-redis'
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    ports:
      - '6379:6379'
    

  airflow-webserver:
    <<: *airflow-common
    container_name: 'airflow-webserver'
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

  airflow-scheduler:
    <<: *airflow-common
    container_name: 'airflow-scheduler'
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

  airflow-worker1:
    build: ./worker_config
    container_name: 'airflow-worker_1'
    command: celery worker -H worker_1
    healthcheck:
      test:
      - "CMD-SHELL"
      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      - redis
      - postgres
    volumes: 
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
      - ./assets:/opt/airflow/assets
      - ./airflow.cfg:/opt/airflow/airflow.cfg
    extra_hosts: *extra_hosts

  airflow-worker2:
    build: ./worker_config
    container_name: 'airflow-worker_2'
    command: celery worker -H worker_2
    healthcheck:
      test:
      - "CMD-SHELL"
      - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      - redis
      - postgres
    volumes: 
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
      - ./assets:/opt/airflow/assets
      - ./airflow.cfg:/opt/airflow/airflow.cfg
    extra_hosts: *extra_hosts

  flower:
    <<: *airflow-common
    container_name: 'airflow_flower'
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - redis
      - postgres

and my docker-compose.yml for worker on another machine:

version: '3.4'

x-hosts: &extra_hosts
  postgres: XX.X.XX.XXX
  redis: XX.X.XX.XXX

x-airflow-common:
  &airflow-common
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
    AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
    AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./assets:/opt/airflow/assets
    - ./airflow.cfg:/opt/airflow/airflow.cfg
    - /etc/hostname:/etc/hostname
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  extra_hosts: *extra_hosts

services:
  worker_3:
    build: ./worker_config
    restart: always
    extra_hosts: *extra_hosts
    volumes:
      - ./airflow.cfg:/opt/airflow/airflow.cfg
      - ./dags:/opt/airflow/dags
      - ./assets:/opt/airflow/assets
      - ./logs:/opt/airflow/logs
      - /etc/hostname:/etc/hostname
    entrypoint: airflow celery worker -H worker_3
    environment:
      <<: *airflow-common-env
      WORKER_NAME: worker_147
    healthcheck:
      test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
      interval: 30s
      timeout: 30s
      retries: 3

  worker_4:
    build: ./worker_config_py2
    restart: always
    extra_hosts: *extra_hosts
    volumes:
      - ./airflow.cfg:/opt/airflow/airflow.cfg
      - ./dags:/opt/airflow/dags
      - ./assets:/opt/airflow/assets
      - ./logs:/opt/airflow/logs
      - /etc/hostname:/etc/hostname
    entrypoint: airflow celery worker -H worker_4_py2 -q py2
    environment:
      <<: *airflow-common-env
      WORKER_NAME: worker_4_py2
    healthcheck:
      test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
      interval: 30s
      timeout: 30s
      retries: 3
    ports:
      - 8793:8793

CodePudding user response:

For this issue: " Failed to fetch log file from worker. [Errno -3] Temporary failure in name resolution"

Looks like the worker's hostname is not being correctly resolved. The web program of the master needs to go to the worker to fetch the log and display it on the front-end page. This process is to find the host name of the worker. Obviously, the host name cannot be found, Therefore, add the host name to IP mapping on the master's vim /etc/hosts

  1. You need to have the image that's going to be used in all your containers except message broker, meta database and worker monitor. Following is the Dockerfile.

2.If using LocalExecutor, the scheduler and the webserver must be on the same host.

Docker file:

FROM puckel/docker-airflow:1.10.9
COPY airflow/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

here is for deps for docker to deploy for webserver

webserver:

The web program of the master needs to go to the worker to fetch the log and display it on the front-end page. This process is to find the host name of the worker. Obviously, the host name cannot be found, therefore, add the host name to IP mapping on the master's vim /etc/hosts

to fix it:

Fist of all, get configuration file by typing:

helm show values apache-airflow/airflow > values.yaml 

After that check that fixPermissions is true.

You need to enable persistence volumes: Enable persistent volumes enabled: true Volume size for worker StatefulSet size: 10Gi If using a custom storageClass, pass name ref to all statefulSets here storageClassName: Execute init container to chown log directory.

fixPermissions: true

Update your installation by:

helm upgrade --install airflow apache-airflow/airflow -n ai

CodePudding user response:

Since your not using Helm, you can still set up a multi-node airflow worker on different machines by using CeleryExecutor. You'll need to use the worker container host IP.

Celery Backend needs to be configured to enable CeleryExecutor mode at Airflow Architecture. One useful framework / application for Celery backend is RabbitMQ.

Airflow Multi-Node Cluster with Celery Installation and Configuration steps:

Install RabbitMQ

yum install epel-release
yum install rabbitmq-server
  1. Enable and start RabbitMQ Server

    systemctl enable rabbitmq-server.service
     systemctl start rabbitmq-server.service
    
  2. Enable RabbitMQ Web Management Console Interface

    rabbitmq-plugins enable rabbitmq_management

RabbitMQ Web Management Console

rabbitmq server default port number is 15672, default username and password for web management console is admin/admin.

RabbitMQ Web management Interface

  1. Install pyamqp tranport protocol for RabbitMQ and PostGreSQL Adaptor

    pip install pyamqp
    

amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not. You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (http://github.com/celery/py-amqp)

Install PostGreSQL Adaptor: psycopg2 Psycopg is a PostgreSQL adapter for the Python programming language

pip install psycopg2
  1. Install Airflow

    pip install 'apache-airflow[all]

Check version of airflow

airflow version
  1. Initialize Database After Installation and configuration, you need to initialize database before you can run the DAGs and it’s task. All latest changes would get reflected to Airflow metadata from configuration.

    airflow initdb

  2. Celery Installation Celery should be installed on master node and all the worker nodes.

    pip install celery==4.3.0
    

Check the version of Celery

celery --version
  1. Change in airflow.cfg file for Celery Executor

    executor = CeleryExecutor sql_alchemy_conn = postgresql psycopg2://airflow:airflow@{HOSTNAME}/airflow broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/ celery_result_backend = db postgresql://airflow:airflow@{HOSTNAME}/airflow dags_are_paused_at_creation = True load_examples = False

Once you have made this changes in the configuration file airflow.cfg, you have to update the airflow metadata with command

airflow initdb 

and later restart the airflow You can now start the airflow webserver with below command

# default port is 8080

airflow webserver -p 8000

You can start the scheduler

airflow scheduler

You have to also start the airflow worker at each worker nodes.

airflow worker

Once you’re done with starting various airflow services, you can check airflow UI at http://<IP-ADDRESS/HOSTNAME>:8000

  • Related