Home > Software engineering >  How to use Docker Airflow - ExternalPythonOperator - python=os.fspath(sys.executable)?
How to use Docker Airflow - ExternalPythonOperator - python=os.fspath(sys.executable)?

Time:10-09

GOAL

System

  • Ubuntu 20.04 LTS
  • AMD x86

1st Original Dockerfile

1st Original image

My folder structure

airflow/

  • Dockerfile
  • requirements.txt
  • docker-compose.txt
  • dags (folder)
  • logs (folder)
  • airv (folder)
  • plugins (folder)
  • airflow (folder)
  • .env

MY requirements.txt

  • requirements.txt - Do not have to have airflow installed in it.
pandas==1.3.0
numpy==1.20.3

My Dockerfile [CORRECT]

  • This pulls the original image and extends it
FROM apache/airflow:2.4.1-python3.8

# Compulsory to switch parameter
ENV PIP_USER=false

#python venv setup
RUN python3 -m venv /opt/airflow/venv1

# Install dependencies:
COPY requirements.txt .

RUN /opt/airflow/venv1/bin/pip install -r requirements.txt
ENV PIP_USER=true

Terminal Command

docker build -t my-image-apache/airflow:2.4.1 .

docker-compose.yml

Official original docker-compose.yml file https://airflow.apache.org/docs/apache-airflow/2.4.1/docker-compose.yaml modified this part:

---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1} #<- this is because of my terminal command above section
#  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.1} <--- THIS WAS THE ORIGINAL

Build image to Container

docker-compose up

1.) DAG File

"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)

PYTHON = sys.executable

BASE_DIR = tempfile.gettempdir()

with DAG(
    dag_id='test_external_python_venv_dag2',
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['my_test'],
) as dag:
    #@task.external_python(task_id="test_external_python_venv_task", python=os.fspath(sys.executable))
    u/task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1/bin/activate'))
    def test_external_python_venv_def():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep
        ########## MY CODE ##########
        import numpy as np
        import pandas as pd
        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        print(df)
        a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
        print(a)
        #a= 10
        return a
        ########## XXXXX MY CODE XXXXX ##########

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print('Please wait...', flush=True)
            sleep(1)
        print('Finished')

    external_python_task = test_external_python_venv_def()

1.) LOG


*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T14:27:12.221899 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [queued]>
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [queued]>
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 14:27:13 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 14:27:12.221899 00:00
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:54} INFO - Started process 7262 to run task
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T14:27:12.221899 00:00', '--job-id', '76', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmpphwlwgkp']
[2022-10-06, 14:27:13 UTC] {standard_task_runner.py:83} INFO - Job 76: Subtask test_external_python_venv_task
[2022-10-06, 14:27:13 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
.....
[2022-10-06, 14:27:13 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function__4.task_3>, task_group_function__4.task_2 already registered for DAG: example_task_group_decorator
[2022-10-06, 14:27:13 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:27:12.221899 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T14:27:12.221899 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T14:27:12.221899 00:00
[2022-10-06, 14:27:14 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'
[2022-10-06, 14:27:14 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by  /opt/airflow/venv1/bin/activate. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 682, in _get_python_version_from_environment
    result = subprocess.check_output([self.python, "--version"], text=True)
  File "/usr/local/lib/python3.8/subprocess.py", line 415, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/local/lib/python3.8/subprocess.py", line 493, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/local/lib/python3.8/subprocess.py", line 858, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/local/lib/python3.8/subprocess.py", line 1704, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
PermissionError: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 665, in execute_callable
    python_version_as_list_of_strings = self._get_python_version_from_environment()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 685, in _get_python_version_from_environment
    raise ValueError(f"Error while executing {self.python}: {e}")
ValueError: Error while executing /opt/airflow/venv1/bin/activate: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'
[2022-10-06, 14:27:14 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T142712, start_date=20221006T142713, end_date=20221006T142714
[2022-10-06, 14:27:14 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 76 for task test_external_python_venv_task (Error while executing /opt/airflow/venv1/bin/activate: [Errno 13] Permission denied: '/opt/airflow/venv1/bin/activate'; 7262)
[2022-10-06, 14:27:14 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 14:27:14 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

2.) DAG

@task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1/bin/'))

2.) LOG

*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T14:55:17.030808 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [queued]>
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [queued]>
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 14:55:17 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 14:55:17.030808 00:00
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:54} INFO - Started process 8456 to run task
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T14:55:17.030808 00:00', '--job-id', '79', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmppwy4xrz8']
[2022-10-06, 14:55:17 UTC] {standard_task_runner.py:83} INFO - Job 79: Subtask test_external_python_venv_task
[2022-10-06, 14:55:17 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 14:55:17 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 14:55:17 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
.....
[2022-10-06, 14:55:18 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function__4.task_3>, task_group_function__4.task_2 already registered for DAG: example_task_group_decorator
[2022-10-06, 14:55:18 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T14:55:17.030808 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T14:55:17.030808 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T14:55:17.030808 00:00
[2022-10-06, 14:55:18 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1/bin/'
[2022-10-06, 14:55:18 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by  /opt/airflow/venv1/bin/. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
    raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1/bin' must be a file
[2022-10-06, 14:55:18 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T145517, start_date=20221006T145517, end_date=20221006T145518
[2022-10-06, 14:55:18 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 79 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1/bin' must be a file; 8456)
[2022-10-06, 14:55:18 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 14:55:18 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

3.) DAG

@task.external_python(task_id="test_external_python_venv_task", python=os.fspath('/opt/airflow/venv1'))

3.) LOG

*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T15:08:54.148452 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [queued]>
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [queued]>
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 15:08:54.148452 00:00
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:54} INFO - Started process 9034 to run task
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T15:08:54.148452 00:00', '--job-id', '80', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmpipmwce8e']
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:83} INFO - Job 80: Subtask test_external_python_venv_task
[2022-10-06, 15:08:55 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for DAG: example_xcom_args
[2022-10-06, 15:08:55 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): print_value>, generate_value already registered for DAG: example_xcom_args
......
[2022-10-06, 15:08:55 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:08:54.148452 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T15:08:54.148452 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T15:08:54.148452 00:00
[2022-10-06, 15:08:55 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1'
[2022-10-06, 15:08:55 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by  /opt/airflow/venv1. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
    raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1' must be a file
[2022-10-06, 15:08:55 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T150854, start_date=20221006T150855, end_date=20221006T150855
[2022-10-06, 15:08:55 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 80 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1' must be a file; 9034)
[2022-10-06, 15:08:55 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 15:08:55 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

4.) DAG`

@task.external_python(task_id="test_external_python_venv_task", python='/opt/airflow/venv1')

4.) LOG

*** Reading local file: /opt/airflow/logs/dag_id=test_external_python_venv_dag2/run_id=manual__2022-10-06T15:14:45.510735 00:00/task_id=test_external_python_venv_task/attempt=1.log
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [queued]>
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [queued]>
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2022-10-06, 15:14:46 UTC] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): test_external_python_venv_task> on 2022-10-06 15:14:45.510735 00:00
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:54} INFO - Started process 9286 to run task
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_external_python_venv_dag2', 'test_external_python_venv_task', 'manual__2022-10-06T15:14:45.510735 00:00', '--job-id', '82', '--raw', '--subdir', 'DAGS_FOLDER/test_venv2.py', '--cfg-path', '/tmp/tmp305tmh_g']
[2022-10-06, 15:14:46 UTC] {standard_task_runner.py:83} INFO - Job 82: Subtask test_external_python_venv_task
[2022-10-06, 15:14:46 UTC] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_venv2.py
[2022-10-06, 15:14:46 UTC] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): generate_value>, print_value already registered for 
........
[2022-10-06, 15:14:46 UTC] {task_command.py:384} INFO - Running <TaskInstance: test_external_python_venv_dag2.test_external_python_venv_task manual__2022-10-06T15:14:45.510735 00:00 [running]> on host 1b2db7bf2320
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_external_python_venv_dag2
AIRFLOW_CTX_TASK_ID=test_external_python_venv_task
AIRFLOW_CTX_EXECUTION_DATE=2022-10-06T15:14:45.510735 00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-06T15:14:45.510735 00:00
[2022-10-06, 15:14:47 UTC] {python.py:725} WARNING - When checking for Airflow installed in venv got [Errno 13] Permission denied: '/opt/airflow/venv1'
[2022-10-06, 15:14:47 UTC] {python.py:726} WARNING - This means that Airflow is not properly installed by  /opt/airflow/venv1. Airflow context keys will not be available. Please Install Airflow 2.4.1 in your environment to access them.
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
    return super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 662, in execute_callable
    raise ValueError(f"Python Path '{python_path}' must be a file")
ValueError: Python Path '/opt/airflow/venv1' must be a file
[2022-10-06, 15:14:47 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_external_python_venv_dag2, task_id=test_external_python_venv_task, execution_date=20221006T151445, start_date=20221006T151446, end_date=20221006T151447
[2022-10-06, 15:14:47 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 82 for task test_external_python_venv_task (Python Path '/opt/airflow/venv1' must be a file; 9286)
[2022-10-06, 15:14:47 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-10-06, 15:14:47 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

CodePudding user response:

dag file

"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)

PYTHON = sys.executable

BASE_DIR = tempfile.gettempdir()

with DAG(
    dag_id='test_external_python_venv_dag2',
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['my_test'],
) as dag:
    #@task.external_python(task_id="test_external_python_venv_task", python=os.fspath(sys.executable))
    @task.external_python(task_id="test_external_python_venv_task", python='/opt/airflow/venv1/bin/python3')
    def test_external_python_venv_def():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep
        ########## MY CODE ##########
        import numpy as np
        import pandas as pd
        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        print(df)
        a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
        print(a)
        #a= 10
        return a
        ########## XXXXX MY CODE XXXXX ##########

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print('Please wait...', flush=True)
            sleep(1)
        print('Finished')

    external_python_task = test_external_python_venv_def()

docker-compose.yml

ADD THIS PART

AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'

to your docker-compose.yml

.....
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

    # I have added these
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
....
  • Related