I just recently installed airflow 2.1.4 with docker containers, I've successfully set up the postgres, redis, scheduler, 2x local workers, and flower on the same machine with docker-compose.
Now I want to expand, and set up workers on other machines.
I was able to get the workers up and running, flower is able to find the worker node, the worker is receiving tasks from the scheduler correctly, but regardless of the result status of the task, the task would be marked as failed with error message like below:
*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-29T14:38:37.669734 00:00/1.log
*** Fetching from: http://b7a0154e7e20:8793/log/test/test/2021-10-29T14:38:37.669734 00:00/1.log
*** Failed to fetch log file from worker. [Errno -3] Temporary failure in name resolution
Then I tried replaced AIRFLOW__CORE__HOSTNAME_CALLABLE: 'socket.getfqdn'
with AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
I got this error instead:
*** Log file does not exist: /opt/airflow/logs/test/test/2021-10-28T15:47:59.625675 00:00/1.log
*** Fetching from: http://172.18.0.2:8793/log/test/test/2021-10-28T15:47:59.625675 00:00/1.log
*** Failed to fetch log file from worker. [Errno 113] No route to host
Then I tried map the port 8793 of the worker with its host machine (in worker_4 below), now it's returning:
*** Failed to fetch log file from worker. [Errno 111] Connection refused
but sometimes still give "Temporary failure in name resolution" error.
I've also tried to copy the URL in the error, and change replace the IP with the host machine ip, and got this message:
Forbidden
You don't have the permission to access the requested resource. It is either read-protected or not readable by the server.
Please let me know if additional info is needed.
Thanks in advance!
Below is my docker-compose.yml for the scheduler/webserver/flower:
version: '3.4'
x-hosts: &extra_hosts
postgres: XX.X.XX.XXX
redis: XX.X.XX.XXX
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.4}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-slack}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./assets:/opt/airflow/assets
- ./airflow.cfg:/opt/airflow/airflow.cfg
- /etc/hostname:/etc/hostname
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
extra_hosts: *extra_hosts
services:
postgres:
container_name: 'airflow-postgres'
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- ./data/postgres:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
ports:
- '5432:5432'
redis:
image: redis:latest
container_name: 'airflow-redis'
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
ports:
- '6379:6379'
airflow-webserver:
<<: *airflow-common
container_name: 'airflow-webserver'
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- redis
- postgres
airflow-scheduler:
<<: *airflow-common
container_name: 'airflow-scheduler'
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- redis
- postgres
airflow-worker1:
build: ./worker_config
container_name: 'airflow-worker_1'
command: celery worker -H worker_1
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
- redis
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./assets:/opt/airflow/assets
- ./airflow.cfg:/opt/airflow/airflow.cfg
extra_hosts: *extra_hosts
airflow-worker2:
build: ./worker_config
container_name: 'airflow-worker_2'
command: celery worker -H worker_2
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
- redis
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./assets:/opt/airflow/assets
- ./airflow.cfg:/opt/airflow/airflow.cfg
extra_hosts: *extra_hosts
flower:
<<: *airflow-common
container_name: 'airflow_flower'
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- redis
- postgres
and my docker-compose.yml for worker on another machine:
version: '3.4'
x-hosts: &extra_hosts
postgres: XX.X.XX.XXX
redis: XX.X.XX.XXX
x-airflow-common:
&airflow-common
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DEFAULT_TIMEZONE: 'America/New_York'
AIRFLOW__CORE__HOSTNAME_CALLABLE: 'airflow.utils.net.get_host_ip_address'
AIRFLOW_WEBSERVER_DEFAULT_UI_TIMEZONE: 'America/New_York'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./assets:/opt/airflow/assets
- ./airflow.cfg:/opt/airflow/airflow.cfg
- /etc/hostname:/etc/hostname
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
extra_hosts: *extra_hosts
services:
worker_3:
build: ./worker_config
restart: always
extra_hosts: *extra_hosts
volumes:
- ./airflow.cfg:/opt/airflow/airflow.cfg
- ./dags:/opt/airflow/dags
- ./assets:/opt/airflow/assets
- ./logs:/opt/airflow/logs
- /etc/hostname:/etc/hostname
entrypoint: airflow celery worker -H worker_3
environment:
<<: *airflow-common-env
WORKER_NAME: worker_147
healthcheck:
test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
interval: 30s
timeout: 30s
retries: 3
worker_4:
build: ./worker_config_py2
restart: always
extra_hosts: *extra_hosts
volumes:
- ./airflow.cfg:/opt/airflow/airflow.cfg
- ./dags:/opt/airflow/dags
- ./assets:/opt/airflow/assets
- ./logs:/opt/airflow/logs
- /etc/hostname:/etc/hostname
entrypoint: airflow celery worker -H worker_4_py2 -q py2
environment:
<<: *airflow-common-env
WORKER_NAME: worker_4_py2
healthcheck:
test: ['CMD-SHELL', '[ -f /usr/local/airflow/airflow-worker.pid ]']
interval: 30s
timeout: 30s
retries: 3
ports:
- 8793:8793
CodePudding user response:
For this issue: " Failed to fetch log file from worker. [Errno -3] Temporary failure in name resolution"
Looks like the worker's hostname is not being correctly resolved. The web program of the master needs to go to the worker to fetch the log and display it on the front-end page. This process is to find the host name of the worker. Obviously, the host name cannot be found, Therefore, add the host name to IP mapping on the master's vim /etc/hosts
- You need to have the image that's going to be used in all your containers except message broker, meta database and worker monitor. Following is the Dockerfile.
2.If using LocalExecutor, the scheduler and the webserver must be on the same host.
Docker file:
FROM puckel/docker-airflow:1.10.9
COPY airflow/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
here is for deps for docker to deploy for webserver
webserver:
The web program of the master needs to go to the worker to fetch the log and display it on the front-end page. This process is to find the host name of the worker. Obviously, the host name cannot be found, therefore, add the host name to IP mapping on the master's vim /etc/hosts
to fix it:
Fist of all, get configuration file by typing:
helm show values apache-airflow/airflow > values.yaml
After that check that fixPermissions is true.
You need to enable persistence volumes: Enable persistent volumes enabled: true Volume size for worker StatefulSet size: 10Gi If using a custom storageClass, pass name ref to all statefulSets here storageClassName: Execute init container to chown log directory.
fixPermissions: true
Update your installation by:
helm upgrade --install airflow apache-airflow/airflow -n ai
CodePudding user response:
Since your not using Helm, you can still set up a multi-node airflow worker on different machines by using CeleryExecutor. You'll need to use the worker container host IP.
Celery Backend needs to be configured to enable CeleryExecutor mode at Airflow Architecture. One useful framework / application for Celery backend is RabbitMQ.
Airflow Multi-Node Cluster with Celery Installation and Configuration steps:
Install RabbitMQ
yum install epel-release
yum install rabbitmq-server
Enable and start RabbitMQ Server
systemctl enable rabbitmq-server.service systemctl start rabbitmq-server.service
Enable RabbitMQ Web Management Console Interface
rabbitmq-plugins enable rabbitmq_management
RabbitMQ Web Management Console
rabbitmq server default port number is 15672, default username and password for web management console is admin/admin.
RabbitMQ Web management Interface
Install pyamqp tranport protocol for RabbitMQ and PostGreSQL Adaptor
pip install pyamqp
amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not. You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (http://github.com/celery/py-amqp)
Install PostGreSQL Adaptor: psycopg2 Psycopg is a PostgreSQL adapter for the Python programming language
pip install psycopg2
Install Airflow
pip install 'apache-airflow[all]
Check version of airflow
airflow version
Initialize Database After Installation and configuration, you need to initialize database before you can run the DAGs and it’s task. All latest changes would get reflected to Airflow metadata from configuration.
airflow initdb
Celery Installation Celery should be installed on master node and all the worker nodes.
pip install celery==4.3.0
Check the version of Celery
celery --version
Change in airflow.cfg file for Celery Executor
executor = CeleryExecutor sql_alchemy_conn = postgresql psycopg2://airflow:airflow@{HOSTNAME}/airflow broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/ celery_result_backend = db postgresql://airflow:airflow@{HOSTNAME}/airflow dags_are_paused_at_creation = True load_examples = False
Once you have made this changes in the configuration file airflow.cfg, you have to update the airflow metadata with command
airflow initdb
and later restart the airflow You can now start the airflow webserver with below command
# default port is 8080
airflow webserver -p 8000
You can start the scheduler
airflow scheduler
You have to also start the airflow worker at each worker nodes.
airflow worker
Once you’re done with starting various airflow services, you can check airflow UI at http://<IP-ADDRESS/HOSTNAME>:8000