GOAL
The goal would be to get a very simple Airflow Docker where I just want to install a few very basic pip packages from requirements.txt . Tan run a python script that contains numpy and pandas parts.
COMMANDS
docker build -t my38 .
docker-compose up airflow-init
docker-compose up -d
FILES
airflow/Dockerfile
FROM apache/airflow:latest-python3.8
COPY requirements.txt .
RUN pip install -r requirements.txt
airflow/requirements.txt
apache-airflow==2.4.0
pandas==1.4.2
numpy==1.20.3
pendulum==2.1.2
airflow/docker-compose.yml (THIS IS FROM THE OFFICIAL AIRFLOW SITE)
--- version: '3' x-airflow-common: &airflow-common # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest-python3.8} # build: . environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services: postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "dddd" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9] ' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up # or by explicitly targeted on the command line e.g. docker-compose up flower. # See: https://docs.docker.com/compose/profiles/ flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes: postgres-db-volume:
airflow/dag/test_np.py (THIS IS FROM THE OFFICIAL AIRFLOW SITE)
import pendulum
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],) as dag:
@task()
def print_array():
"""Print Numpy array."""
import numpy as np # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE
a = np.arange(15).reshape(3, 5)
print(a)
return a
print_array()
OUTPUT LOG
** Reading local file: /opt/airflow/logs/dag_id=example_python_operator/run_id=manual__2022-09-23T17:31:35.153420 00:00/task_id=print_array/attempt=1.log
[2022-09-23, 18:31:35 BST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_array manual__2022-09-23T17:31:35.153420 00:00 [queued]>
[2022-09-23, 18:31:35 BST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_array manual__2022-09-23T17:31:35.153420 00:00 [queued]>
[2022-09-23, 18:31:35 BST] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-09-23, 18:31:35 BST] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-09-23, 18:31:35 BST] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-09-23, 18:31:35 BST] {taskinstance.py:1383} INFO - Executing <Task(_PythonDecoratedOperator): print_array> on 2022-09-23 17:31:35.153420 00:00
[2022-09-23, 18:31:35 BST] {standard_task_runner.py:54} INFO - Started process 75 to run task
[2022-09-23, 18:31:35 BST] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'example_python_operator', 'print_array', 'manual__2022-09-23T17:31:35.153420 00:00', '--job-id', '59', '--raw', '--subdir', 'DAGS_FOLDER/test_np.py', '--cfg-path', '/tmp/tmpyjtdxl2p']
[2022-09-23, 18:31:35 BST] {standard_task_runner.py:83} INFO - Job 59: Subtask print_array
[2022-09-23, 18:31:35 BST] {dagbag.py:525} INFO - Filling up the DagBag from /opt/***/dags/test_np.py
[2022-09-23, 18:31:35 BST] {task_command.py:384} INFO - Running <TaskInstance: example_python_operator.print_array manual__2022-09-23T17:31:35.153420 00:00 [running]> on host c1db16ac8cf8
[2022-09-23, 18:31:35 BST] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_python_operator
AIRFLOW_CTX_TASK_ID=print_array
AIRFLOW_CTX_EXECUTION_DATE=2022-09-23T17:31:35.153420 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-23T17:31:35.153420 00:00
[2022-09-23, 18:31:35 BST] {logging_mixin.py:117} INFO - [[ 0 1 2 3 4]
[ 5 6 7 8 9]
[10 11 12 13 14]]
[2022-09-23, 18:31:35 BST] {python.py:177} INFO - Done. Returned value was: [[ 0 1 2 3 4]
[ 5 6 7 8 9]
[10 11 12 13 14]]
[2022-09-23, 18:31:35 BST] {xcom.py:599} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-09-23, 18:31:35 BST] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2374, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 206, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 597, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type ndarray is not JSON serializable
[2022-09-23, 18:31:35 BST] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=example_python_operator, task_id=print_array, execution_date=20220923T173135, start_date=20220923T173135, end_date=20220923T173135
[2022-09-23, 18:31:35 BST] {standard_task_runner.py:102} ERROR - Failed to execute job 59 for task print_array (Object of type ndarray is not JSON serializable; 75)
[2022-09-23, 18:31:35 BST] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-09-23, 18:31:35 BST] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
TRIED TO SOLVE THE ISSUE
- {xcom.py:599} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config. ->airflow.cfg not gets generated so I can not just add a line to it "enable_xcom_pickling = True" - https://github.com/apache/airflow/issues/13487 OR Airflow Xcom: How to cast byte array for value into text or json text in SQL?
CodePudding user response:
I had to add AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
to my docker-compose.yml file - https://github.com/apache/airflow/issues/13487#issuecomment-757661848
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest-python3.8}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'