So I am creating task in a TaskGroup and am trying to add them in my dag sequence of tasks but it is throwing this error:
Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'
I am creating my task group and tasks like this:
def get_task_group(dag, task_group):
t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
with DAG('some_dag', default_args=args) as dag:
with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
run_model_task_group = get_task_group(dag, tg)
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
If I remove the task groups and leave task group task from sequencing by removing the lines
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
I can see that a1, a2 a3 & a4 are sequenced properly and I can the disconnected run_model_task_group
tasks, but as soon as I add it in the sequence, I get the aforementioned error.
Can anyone guide me what might be happening here?
Note that I am using the function taking dag
and task_group
parameters to create task group tasks because I want to create the same set of tasks for another dag too.
Python Version: 3.8.8
Airflow Version: 2.0.1
CodePudding user response:
AttributeError: 'NoneType' object has no attribute 'update_relative'
It's happening because run_model_task_group
its None
outside of the scope of the With
block, which is expected Python behaviour.
Without changing things too much from what you have done so far, you could refactor get_task_group()
to return a TaskGroup
object, like this:
def get_task_group(dag, group_id):
with TaskGroup(group_id=group_id, dag=dag) as tg:
t1 = DummyOperator(task_id='t1', dag=dag)
t2 = DummyOperator(task_id='t2', dag=dag)
t3 = DummyOperator(task_id='t3', dag=dag)
t4 = DummyOperator(task_id='t4', dag=dag)
t5 = DummyOperator(task_id='t5', dag=dag)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
return tg
In the DAG definition simply call it with:
run_model_task_group = get_task_group(dag, "run_model_tasks")
The resultant graph view looks like this:
DAG definition:
with DAG('some_dag',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='@once') as dag:
# with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
# run_model_task_group = get_task_group(dag, )
run_model_task_group = get_task_group(dag, "run_model_tasks")
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
Finally, considering using bitwise operator instead of set_downstream
and set_upstream
, it's the recommend way and also less verbose source here.
Let me know if that worked for you.
Tested with: Airflow version: 2.1.4, Python 3.8.10