I am running Airflow in a docker container on my local machine. I'm running a test DAG doing 3 tasks. The three tasks run fine, however, the last task with the bash operator is stuck in a loop as seen in the picture in the bottom. Looking in the log file, an entry is only generated for the first execution of the bash python script, then nothing, but the python file keeps getting executed. Any suggestions as to what could be the issue?
Thanks,
Richard
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def creating_dataframe(ti):
import pandas as pd
import os
loc = r'/opt/airflow/dags/'
filename = r'demo.csv'
df_location = loc filename
ti.xcom_push(key='df_location', value=df_location)
if os.path.exists(loc filename):
print("if exists")
return df_location
else:
df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
index = [pd.Timestamp.now()])
df.to_csv(loc filename, sep=';')
print("does not exist")
return df_location
def adding_row_to_dataframe(ti):
import pandas as pd
fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]
df = pd.read_csv(fetched_location,index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])
df2.to_csv(fetched_location,sep=";")
print("second function")
with DAG(
dag_id="richards_airflow_demo",
schedule_interval="@once",
start_date=datetime(2022, 2, 17 ),
catchup=False,
tags=["this is a demo of airflow","adding row"],
) as dag:
task1 = PythonOperator(
task_id="creating_dataframe",
python_callable=creating_dataframe,
do_xcom_push=True
)
task2 = PythonOperator(
task_id='adding_row_to_dataframe',
python_callable=adding_row_to_dataframe
)
task3 = BashOperator(
task_id='python_bash_script',
bash_command=r"echo 'python /opt/scripts/test.py'"
)
task1 >> task2 >> task3
Bash python script:
import pandas as pd
df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])
df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')