Home > Back-end >  Airflow tasks iterating over list should run sequentially
Airflow tasks iterating over list should run sequentially

Time:02-17

I am running a tasks based on a list. The task id gets incremented based on the list. After the completion of these tasks I want another tasks to execute . Following is the code:

with DAG('test',) as dag:

        t1 = [PythonOperator(
        task_id=f"task_hour_{hours}",
        python_callable=hourly_job,
        op_kwargs={
        "hour": hours
        }
        ) for hours in ['01', '02', '03']
        ]
    
        t2 = PythonOperator(
        task_id="daily",
        python_callable=daily_job
        )
    t1 >> t2

What is happening is these hourly tasks are all running in parallel followed up by daily task for each one of them. Like this:

task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily

What i want to happen is that these hourly tasks should execute sequentially and lastly the daily task should execute:

task_hour_01 >> task_hour_02 >> task_hour_03 >> daily

So there are two problems:

  • The tasks should run in sequence.
  • The daily task should be the last task to execute and should run only once.

CodePudding user response:

Based on this answer, you could use and auxiliar variable like t0 to initialize and handle your Operators.

with DAG('test',) as dag:
    hours = ['01', '02', '03']
    t0 = None

    for hour in hours:
        t1 = PythonOperator(
             task_id=f"task_hour_{hours}",
             python_callable=hourly_job,
             op_kwargs={"hour": hour}
        if t0 is not None:
            t0 >> t1
        t0 = t1
    t2 = PythonOperator(
    task_id="daily",
    python_callable=daily_job
    )
t1 >> t2
  • Related