As shown in the image above, there are many upstream tasks associated with oneTaskFailed
.
I set that as long as one task fails, an email alert will be triggered.
But when I send an email, how do I determine the task ID that triggered the alert and send a log link of the task at the same time?
CodePudding user response:
In the onTaskFailed you can get the dag_run from the context.
And then you can find the TaskInstances that run in the current dag and find the one with the error state.
def on_failed_task(**context):
dagrun: DAG = context["dag_run"]
failed_ti = [ti for ti in dagrun.get_task_instances() if ti.state == State.FAILED]
print(failed_ti)