I have created a custom email alert python file, I am calling that function in another dag to send email.
Lets say - email alert file is custom_alert.py
from airflow.operators.email import EmailOperator
from airflow.utils.email import send_email
def custom_failure_email(context, **kwargs):
"""Send custom email alerts."""
dag_run = context.get('task_instance').dag_id
subject = f"[ActionReq]-dag failure-{dag_run}"
# email contents
body= """Hi Team,<br><br>
<b style="font-size:15px;color:red;">Airflow job on error, please find details below.</b>
Thank you!,<br>
)
email_list = ['[email protected]', '[email protected]']
for i in range(len(email_list)):
send_email(str(email_list[i]),subject,body)
In Parent DAG : Lets Say **email.py**
- I am calling the above function to send failure email.
from custom_alert import custom_failure_email
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'on_failure_callback': custom_failure_email}
This way I am able to send the custom email, but my recipient list is same for every dag. Please let me know, How I can customize it to send different recipient email address for different dag. How I can pass recipient email address from Parent Dag.?
CodePudding user response:
An option is to set params
dict in your DAG
then retrieve this value from the context.
def custom_failure_email(context):
"""Send custom email alerts."""
#...
email_list = context['dag'].params['mailing_list']
for i in range(len(email_list)):
send_email(str(email_list[i]),subject,body)
default_args = {
#....
'params': {
'mailing_list': ['[email protected]', '[email protected]']
},
'email_on_failure': False,
'on_failure_callback': custom_failure_email
}
CodePudding user response:
There is one more way to achieve this using functools.
from functools import partial
new_custom_failure_email = partial(custom_failure_email,
email_list=['[email protected]'])
# Now Pass this in default _args
'on_failure_callback': new_custom_failure_email
and we need to pass email_list as argument in **custom_alert.py**