I have a shell script generate_rpt.sh that reads data from a csv file and performs certain operations as shown below
#!/bin/bash
src=$1
dst=$2
file_name=$3
while IFS='' read -r line || [[ -n "$line" ]]; do
line=${line//$';'/,}
IFS=',' read -r -a columns <<< "$line"
# do something with each row
done < "${3}"
I have a BashOperator that calls the above shell script defined as below
execute_cmd = BashOperator(
task_id="execute_cmd",
provide_context=True,
bash_command="scripts/generate_rpt.sh",
params={'src':'src_prj','dst':'dst_prj','file_name':'rpt_config.csv'}
)
The csv file rpt_config.csv and the shell script execute_cmd.sh are in the scripts folder.
DAG Folder location - /home/airflow/gcs/dags/sus
Scripts folder location - /home/airflow/gcs/dags/sus/scripts
I keep getting the error rpt_config.csv file not found. I have tried giving full file path i.e. /home/airflow/gcs/dags/sus/scripts/rpt_config.csv but even then I get the same error.
When I execute the shell script via gcloud shell, it is working successfully without any errors but when I am executing via DAG it's failing. Can someone help me to understand how to handle this?
Airflow version - composer-1.16.6-airflow-1.10.15
Python version 2
CodePudding user response:
I was able to make your DAG and bash script work by defining the full path for both generate_rpt.sh
and rpt_config.csv
. I also followed your folder structure, see location of scripts below:
Scripts location:
In my DAG I used the environment variable DAGS_FOLDER
to get the path of the dags folder and used it to provide the full path of the files. I also used bash
to call the script and pass the parameters on the command itself. See DAG below:
import datetime
from airflow import models
from airflow.operators import bash
import os
DAGS_FOLDER = os.environ["DAGS_FOLDER"]
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'run_bash',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
execute_cmd = bash.BashOperator(
task_id='execute_cmd', bash_command=f'bash {DAGS_FOLDER}/sus/scripts/generate_rpt.sh src_prj dst_prj {DAGS_FOLDER}/sus/scripts/rpt_config.csv ',)
execute_cmd
Output:
NOTE: I just updated your generate_rpt.sh to print the lines in the csv file hence the output above. I'm using Composer version 1.19.5 and Airflow version 2.2.5