So I was upgrading DAGs from airflow version 1.12.15 to 2.2.2 and DOWNGRADING python from 3.8 to 3.7 (since MWAA doesn't support python 3.8). The DAG is working fine on the previous setup but shows this error on the MWAA setup:
Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
This is the built-in function that seems to be failing:
def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Set a task or a task list to be directly downstream from the current
task. Required by TaskMixin.
"""
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
There is the code we are trying to run in the DAG:
for report in reports:
dag << PythonOperator(
task_id=f"task_{report}",
python_callable=process,
op_kwargs={
"conn": "snowflake_production",
"table": report,
},
provide_context=True,
)
I am thinking this transition from Python 3.8 to 3.7 is causing this issue but I am not sure.
Did anyone run across a similar issue ?
CodePudding user response:
For Airflow>=2.0.0 Assigning task to a DAG using bitwise shift (bit-shift) operators are no longer supported.
Trying to do:
dag = DAG("my_dag")
dummy = DummyOperator(task_id="dummy")
dag >> dummy
Will not work.
Dependencies should be set only between operators.
You should use context manager:
with DAG("my_dag") as dag:
dummy = DummyOperator(task_id="dummy")
It already handles the relations of operator to DAG object.
If you prefer not to, then use the dag parameter in the operator constructor as: DummyOperator(task_id="dummy", dag=dag)