I have an Apache Airflow managed environment running in which a number of DAGs are defined and enabled. Some DAGs are scheduled, running on a 15 minute schedule, while others are not scheduled. All the DAGs are single-task DAGs. The DAGs are structured in the following way:
level 2 DAGs -> (triggers) level 1 DAG -> (triggers) level 0 DAG
The scheduled DAGs are the level 2 DAGs, while the level 1 and level 0 DAGs are unscheduled. The level 0 DAG uses ECSOperator
to call a pre-defined Elastic Container Service (ECS) task, to call a Python ETL script inside a Docker container defined in the ECS task. The level 2 DAGs wait on the level 1 DAG to complete, which in turns waits on the level 0 DAG to complete. The full Python logs produced by the ETL scripts are visible in the CloudWatch logs from the ECS task runs, while the Airflow task logs only show high-level logging.
The singular tasks in the scheduled DAGs (level 2) have depends_on_past
set to False
, and I expected that as a result successive scheduled runs of a level 2 DAG would not depend on each other, i.e. that if a particular run failed it would not prevent the next scheduled run from occurring. But what is happening is that Airflow is overriding this and I can clearly see in the UI that a failure of a particular level 2 DAG run is preventing the next run from being selected by the scheduler - the next scheduled run state is being set to None
, and I have to manually clear the failed DAG run state before the scheduler can schedule it again.
Why does this happen? As far as I know, there is no Airflow configuration option that should override the task-level setting of False
for depends_on_past
in the level 2 DAG tasks. Any pointers would be greatly appreciated.
CodePudding user response:
Answering the question "why is this happening?". I understand that the behavior you are watching is explained by the fact that Tasks are being defined with wait_for_downstream = True
. The docs state the following about it:
wait_for_downstream (bool) -- when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully or be skipped before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X. Note that depends_on_past is forced to True wherever wait_for_downstream is used. Also note that only tasks immediately downstream of the previous task instance are waited for; the statuses of any tasks further downstream are ignored.
Keep in mind that the term previous instances of task X refers to the task_instance of the last scheduled dag_run, not the upstream Task (in a DAG with a daily schedule, that would be the task_instance from "yesterday").
This also explains why your Task are being executed once you clear the state of the previous DAG Run.
I hope it helps you clarifying things up!