Home > Mobile >  Referencing external file in Shell script using BashOperator
Referencing external file in Shell script using BashOperator

Time:08-05

I have a shell script generate_rpt.sh that reads data from a csv file and performs certain operations as shown below

#!/bin/bash
src=$1
dst=$2
file_name=$3

while IFS='' read -r line || [[ -n "$line" ]]; do
    line=${line//$';'/,}
    IFS=',' read -r -a columns <<< "$line"
    # do something with each row
done < "${3}"

I have a BashOperator that calls the above shell script defined as below

execute_cmd = BashOperator(
    task_id="execute_cmd",
    provide_context=True,
    bash_command="scripts/generate_rpt.sh",
    params={'src':'src_prj','dst':'dst_prj','file_name':'rpt_config.csv'}
)

The csv file rpt_config.csv and the shell script execute_cmd.sh are in the scripts folder.

DAG Folder location - /home/airflow/gcs/dags/sus

Scripts folder location - /home/airflow/gcs/dags/sus/scripts

I keep getting the error rpt_config.csv file not found. I have tried giving full file path i.e. /home/airflow/gcs/dags/sus/scripts/rpt_config.csv but even then I get the same error.

When I execute the shell script via gcloud shell, it is working successfully without any errors but when I am executing via DAG it's failing. Can someone help me to understand how to handle this?

Airflow version - composer-1.16.6-airflow-1.10.15

Python version 2

CodePudding user response:

I was able to make your DAG and bash script work by defining the full path for both generate_rpt.sh and rpt_config.csv. I also followed your folder structure, see location of scripts below:

DAG location: enter image description here

Scripts location:

enter image description here

In my DAG I used the environment variable DAGS_FOLDER to get the path of the dags folder and used it to provide the full path of the files. I also used bash to call the script and pass the parameters on the command itself. See DAG below:

import datetime

from airflow import models
from airflow.operators import bash
import os

DAGS_FOLDER = os.environ["DAGS_FOLDER"]
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'run_bash',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    execute_cmd = bash.BashOperator(
        task_id='execute_cmd', bash_command=f'bash {DAGS_FOLDER}/sus/scripts/generate_rpt.sh src_prj dst_prj {DAGS_FOLDER}/sus/scripts/rpt_config.csv ',)

    execute_cmd

Output:

enter image description here

NOTE: I just updated your generate_rpt.sh to print the lines in the csv file hence the output above. I'm using Composer version 1.19.5 and Airflow version 2.2.5

  • Related