Home > database >  Pass Email list as an argument from other dag in airflow
Pass Email list as an argument from other dag in airflow

Time:06-16

I have created a custom email alert python file, I am calling that function in another dag to send email. Lets say - email alert file is custom_alert.py

from airflow.operators.email import EmailOperator
from airflow.utils.email import send_email

def custom_failure_email(context, **kwargs):
    """Send custom email alerts."""
    dag_run = context.get('task_instance').dag_id
    subject = f"[ActionReq]-dag failure-{dag_run}"
    # email contents
    body= """Hi Team,<br><br>
            <b style="font-size:15px;color:red;">Airflow job on error, please find details below.</b>
            Thank you!,<br>
            )
    email_list = ['[email protected]', '[email protected]']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

In Parent DAG : Lets Say **email.py** - I am calling the above function to send failure email.

from custom_alert import custom_failure_email
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email}

This way I am able to send the custom email, but my recipient list is same for every dag. Please let me know, How I can customize it to send different recipient email address for different dag. How I can pass recipient email address from Parent Dag.?

CodePudding user response:

An option is to set params dict in your DAG then retrieve this value from the context.

def custom_failure_email(context):
    """Send custom email alerts."""
    #...
    email_list = context['dag'].params['mailing_list']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

default_args = {
    #....
    'params': {
        'mailing_list': ['[email protected]', '[email protected]']
    },
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email
}

CodePudding user response:

There is one more way to achieve this using functools.

from functools import partial
new_custom_failure_email = partial(custom_failure_email,
                                   email_list=['[email protected]'])
                                                                    
# Now Pass this in default _args
'on_failure_callback': new_custom_failure_email

and we need to pass email_list as argument in **custom_alert.py**

  • Related