GOAL
- I need this because a ExternalPythonOperator feature available since 19 OCT. 2022. = Airflow 2.4.0
- https://airflow.apache.org/docs/docker-stack/build.html#important-notes-for-the-base-images
- ExternalPythonOperator - https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#externalpythonoperator
System
- Ubuntu 20.04 LTS
- AMD x86
1st Original Dockerfile
- That becomes the original image that you can pull - https://hub.docker.com/r/apache/airflow/dockerfile
1st Original image
- This is created from the original Dockerfile - https://hub.docker.com/layers/apache/airflow/latest/images/sha256-5015db92023bebb1e8518767bfa2e465b2f52270aca6a9cdef85d5d3e216d015?context=explore
My folder structure
airflow/
- Dockerfile
- requirements.txt
- docker-compose.txt
- dags (folder)
- logs (folder)
- airv (folder)
- plugins (folder)
- airflow (folder)
- .env
MY requirements.txt
- requirements.txt - Do not have to have airflow installed in it.
pandas==1.3.0
numpy==1.20.3
My Dockerfile [CORRECT]
- This pulls the original image and extends it
FROM apache/airflow:2.4.1-python3.8
# Compulsory to switch parameter
ENV PIP_USER=false
#python venv setup
RUN python3 -m venv /opt/airflow/venv1
# Install dependencies:
COPY requirements.txt .
RUN /opt/airflow/venv1/bin/pip install -r requirements.txt
ENV PIP_USER=true
Terminal Command
docker build -t my-image-apache/airflow:2.4.1 .
docker-compose.yml
Official original docker-compose.yml file https://airflow.apache.org/docs/apache-airflow/2.4.1/docker-compose.yaml modified this part:
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1} #<- this is because of my terminal command above section
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.1} <--- THIS WAS THE ORIGINAL
Build image to Container
docker-compose up
1.) DAG File
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
with DAG(
dag_id='test_external_python_venv_dag2',
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['my_test'],
) as dag:
#@task.external_python(task_id="test_external_python_venv_task", python=os.fspath(sys.executable))
u/task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1/bin/activate'))
def test_external_python_venv_def():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
########## MY CODE ##########
import numpy as np
import pandas as pd
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data=d)
print(df)
a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
print(a)
#a= 10
return a
########## XXXXX MY CODE XXXXX ##########
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print('Please wait...', flush=True)
sleep(1)
print('Finished')
external_python_task = test_external_python_venv_def()
1.) LOG
*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T14:27:12.221899 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [queued]>
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [queued]>
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 14:27:12.221899 00:00
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:54} INFO - Started process 7262 to run task
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T14:27:12.221899 00:00', '--job-id', '76', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmpphwlwgkp']
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:83} INFO - Job 76: Subtask test_external_python_venv_task
[2022-10-06, 14:27:13 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
.....
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function__4.task_3>, task_group_function__4.task_2 already registered for DAG: example_task_group_decorator
[2022-10-06, 14:27:13 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T14:27:12.221899 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T14:27:12.221899 00:00
[2022-10-06, 14:27:14 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'
[2022-10-06, 14:27:14 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by /opt/airflow/venv1/bin/activate. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 682, in _get_python_version_from_environment
result = subprocess.check_output([self.python, "--version"], text=True)
File "/usr/local/lib/python3.8/subprocess.py", line 415, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/usr/local/lib/python3.8/subprocess.py", line 493, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/local/lib/python3.8/subprocess.py", line 858, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/usr/local/lib/python3.8/subprocess.py", line 1704, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 665, in execute_callable
python_version_as_list_of_strings = self._get_python_version_from_environment()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 685, in _get_python_version_from_environment
raise ValueError(f"Error while executing {self.python}: {e}")
ValueError: Error while executing /opt/airflow/venv1/bin/activate: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T142712, start_date=20221006T142713, end_date=20221006T142714
[2022-10-06, 14:27:14 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 76 for task test_external_python_venv_task (Error while executing /opt/airflow/venv1/bin/activate: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'; 7262)
[2022-10-06, 14:27:14 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 14:27:14 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
2.) DAG
@task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1/bin/'))
2.) LOG
*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T14:55:17.030808 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [queued]>
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [queued]>
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 14:55:17.030808 00:00
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:54} INFO - Started process 8456 to run task
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T14:55:17.030808 00:00', '--job-id', '79', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmppwy4xrz8']
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:83} INFO - Job 79: Subtask test_external_python_venv_task
[2022-10-06, 14:55:17 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 14:55:17 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:55:17 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
.....
[2022-10-06, 14:55:18 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function__4.task_3>, task_group_function__4.task_2 already registered for DAG: example_task_group_decorator
[2022-10-06, 14:55:18 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T14:55:17.030808 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T14:55:17.030808 00:00
[2022-10-06, 14:55:18 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1/bin/'
[2022-10-06, 14:55:18 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by /opt/airflow/venv1/bin/. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1/bin' must be a file
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T145517, start_date=20221006T145517, end_date=20221006T145518
[2022-10-06, 14:55:18 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 79 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1/bin' must be a file; 8456)
[2022-10-06, 14:55:18 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 14:55:18 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
3.) DAG
@task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1'))
3.) LOG
*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T15:08:54.148452 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [queued]>
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [queued]>
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 15:08:54.148452 00:00
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:54} INFO - Started process 9034 to run task
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T15:08:54.148452 00:00', '--job-id', '80', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmpipmwce8e']
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:83} INFO - Job 80: Subtask test_external_python_venv_task
[2022-10-06, 15:08:55 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
......
[2022-10-06, 15:08:55 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T15:08:54.148452 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T15:08:54.148452 00:00
[2022-10-06, 15:08:55 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1'
[2022-10-06, 15:08:55 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by /opt/airflow/venv1. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1' must be a file
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T150854, start_date=20221006T150855, end_date=20221006T150855
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 80 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1' must be a file; 9034)
[2022-10-06, 15:08:55 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 15:08:55 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
4.) DAG`
@task.external_python(task_id="test_external_python_venv_task", python='/opt/airflow/venv1')
4.) LOG
*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T15:14:45.510735 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [queued]>
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [queued]>
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 15:14:45.510735 00:00
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:54} INFO - Started process 9286 to run task
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T15:14:45.510735 00:00', '--job-id', '82', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmp305tmh_g']
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:83} INFO - Job 82: Subtask test_external_python_venv_task
[2022-10-06, 15:14:46 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 15:14:46 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for
........
[2022-10-06, 15:14:46 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T15:14:45.510735 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T15:14:45.510735 00:00
[2022-10-06, 15:14:47 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1'
[2022-10-06, 15:14:47 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by /opt/airflow/venv1. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1' must be a file
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T151445, start_date=20221006T151446, end_date=20221006T151447
[2022-10-06, 15:14:47 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 82 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1' must be a file; 9286)
[2022-10-06, 15:14:47 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 15:14:47 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
CodePudding user response:
dag file
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
with DAG(
dag_id='test_external_python_venv_dag2',
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['my_test'],
) as dag:
#@task.external_python(task_id="test_external_python_venv_task", python=os.fspath(sys.executable))
@task.external_python(task_id="test_external_python_venv_task", python='/opt/airflow/venv1/bin/python3')
def test_external_python_venv_def():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
########## MY CODE ##########
import numpy as np
import pandas as pd
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data=d)
print(df)
a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
print(a)
#a= 10
return a
########## XXXXX MY CODE XXXXX ##########
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print('Please wait...', flush=True)
sleep(1)
print('Finished')
external_python_task = test_external_python_venv_def()
docker-compose.yml
ADD THIS PART
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
to your docker-compose.yml
.....
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# I have added these
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
....