Home > Software engineering >  specifying dependencies needed for .py file run through BashOperator() with apache airflow
specifying dependencies needed for .py file run through BashOperator() with apache airflow

Time:08-30

I'm trying to automate the process of running a webscraper daily with apache airflow and docker. I have the airflow server up and running and I can manually initialize my dag through the airflow GUI on the local server, but it's failing.

I'm not sure where to even see what errors are being triggered. My dag.py file is below... and you can see where I'm trying to use the BashOperator function to run the script. I suspect the issue is with the dependencies the scraper uses but I'm not sure how to integrate the config file and the other packages necessary to run the script through apache / docker.

from airflow.models import DAG
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator

dag =  DAG("MI_Spider", start_date=datetime(2021,1,1), schedule_interval="@daily", catchup=False) 

curl = BashOperator(
    task_id='testingbash',
    bash_command="python ~/spider/path/MichiganSpider.py",
    dag=dag)

Should I move the spider file and config file into the airflow project directory or somehow install the dependencies directly to the docker container I'm using along with somehow setting env variables within the docker container instead of calling the db login credentials through a separate config file? I've been using a conda env for the scraper when I run it manually. Is there any way I can just use that environment?

I'm very new to docker and apache airflow so I apologize if this stuff should be obvious.

Thank you in advance!!

CodePudding user response:

Assuming you are on pretty recent version of Airflow I recommend refactoring your DAG to use PythonVirtualenvOperator https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator instead of BashOperator.

Here's example on how to use python operators in Airflow: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/example_python_operator.html

Part relevant to you is:

import logging
import shutil
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)


with DAG(
    dag_id='example_python_operator',
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example'],
) as dag:
    if not shutil.which("virtualenv"):
        log.warning("The virtalenv_python example task requires virtualenv, please install it.")
    else:
        @task.virtualenv(
            task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
        )
        def callable_virtualenv():
            """
            Example function that will be performed in a virtual environment.

            Importing at the module level ensures that it will not attempt to import the
            library before it is installed.
            """
            from time import sleep

            from colorama import Back, Fore, Style

            print(Fore.RED   'some red text')
            print(Back.GREEN   'and with a green background')
            print(Style.DIM   'and in dim text')
            print(Style.RESET_ALL)
            for _ in range(10):
                print(Style.DIM   'Please wait...', flush=True)
                sleep(10)
            print('Finished')

        virtualenv_task = callable_virtualenv()

Just remember to have virtualenv package available in your Airflow image.

  • Related