Home > database >  Airflow - email operator sending multiple files issue
Airflow - email operator sending multiple files issue

Time:09-21

I am using airflow 2.2. I am trying to send multiple files using airflow email operator. the files list will be generated dynamically and using XCom pull to get the list of files from the previous task. For some reason, email operator files parameter is NOT able to read the files list from XCom value. Kindly advise.

Error details:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
    error_file=args.error_file,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/email.py", line 88, in execute
    conn_id=self.conn_id,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 66, in send_email
    **kwargs,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 99, in send_email_smtp
    mime_charset=mime_charset,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 157, in build_mime_message
    with open(fname, "rb") as file:
FileNotFoundError: [Errno 2] No such file or directory: '['
[2022-09-18, 17:24:22 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 1
[2022-09-18, 17:24:23 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator

import os
from datetime import datetime, timedelta

default_args = {
    "owner": 'TEST',
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}

with DAG(
        dag_id="test9_email_operator_dag",
        default_args=default_args,
        start_date=datetime(2022, 9, 14),
        end_date=datetime(2022, 9, 15),
        catchup=True,
        max_active_runs=1,
        schedule_interval="0 12 * * *",  # Runs every day @ 8AM EST
) as dag:

    def print_local_folder_files(local_temp_folder):
        print("local folder files => ", os.listdir(local_temp_folder))
        files_list = []
        for file in os.listdir(local_temp_folder):
            files_list.append(local_temp_folder   file)
        print("files_list => ", files_list)
        return files_list

    print_local_folder_files = PythonOperator(
        task_id='print_local_folder_files',
        python_callable=print_local_folder_files,
        op_kwargs={'local_temp_folder': "/usr/local/airflow/dags/temp_dir/"},
        do_xcom_push=True,
        provide_context=True,
        dag=dag)

    send_email = EmailOperator(
        task_id='send_email',
        to='[email protected]',
        subject='Test Email op Notification',
        html_content='Test email op notification email. ',
        files="{{ task_instance.xcom_pull(task_ids='print_local_folder_files') }}"
    )

    print_local_folder_files >> send_email
    

CodePudding user response:

You pushed a list to Xcom but Xcoms are rendered as string by default so what you have there is a string representation of list. This is why when you try to read it you get the first char because when you iterate over a string you get it char by char.

To solve your issue you should set render_template_as_native_obj=True on the DAG object:

with DAG(
        dag_id="test9_email_operator_dag",
        ...,
        render_template_as_native_obj=True,
) as dag:

This will let Jinja engine know that you expect to render as native Python types so you will get a list rather than a string. For more information check Airflow docs on this feature.

  • Related