Home > Software engineering >  Apache Airflow stuck in a loop executing last task (bash operator executing a python script)
Apache Airflow stuck in a loop executing last task (bash operator executing a python script)

Time:02-21

I am running Airflow in a docker container on my local machine. I'm running a test DAG doing 3 tasks. The three tasks run fine, however, the last task with the bash operator is stuck in a loop as seen in the picture in the bottom. Looking in the log file, an entry is only generated for the first execution of the bash python script, then nothing, but the python file keeps getting executed. Any suggestions as to what could be the issue?

Thanks,

Richard

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def creating_dataframe(ti):
    import pandas as pd
    import os

    loc = r'/opt/airflow/dags/'
    filename = r'demo.csv'
    df_location = loc   filename
    ti.xcom_push(key='df_location', value=df_location)

    if os.path.exists(loc   filename):
        print("if exists")
        return df_location
    
    else:
        df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
                      index = [pd.Timestamp.now()])
        df.to_csv(loc   filename, sep=';')
        print("does not exist")
    
    return df_location


def adding_row_to_dataframe(ti):
    import pandas as pd
    fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]


    df = pd.read_csv(fetched_location,index_col=0,sep=';')   
    new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
                      index = [pd.Timestamp.now()])
    df2 = pd.concat([df,new_df])
    df2.to_csv(fetched_location,sep=";")
    print("second function")

with DAG(
    dag_id="richards_airflow_demo",
    schedule_interval="@once",
    start_date=datetime(2022, 2, 17 ),
    catchup=False,
    tags=["this is a demo of airflow","adding row"],
) as dag:

    task1 = PythonOperator(
        task_id="creating_dataframe",
        python_callable=creating_dataframe,
        do_xcom_push=True
    )


    task2 = PythonOperator(
        task_id='adding_row_to_dataframe',
        python_callable=adding_row_to_dataframe


    )

    task3 = BashOperator(
        task_id='python_bash_script',
    bash_command=r"echo 'python /opt/scripts/test.py'"
    )
    

    task1 >> task2 >> task3

Bash python script:

import pandas as pd

df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')   
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
                      index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])

df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')

Solution

  • Related