I am using airflow 2.2. I am trying to send multiple files using airflow email operator. the files list will be generated dynamically and using XCom pull to get the list of files from the previous task. For some reason, email operator files parameter is NOT able to read the files list from XCom value. Kindly advise.
Error details:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
error_file=args.error_file,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/email.py", line 88, in execute
conn_id=self.conn_id,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 66, in send_email
**kwargs,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 99, in send_email_smtp
mime_charset=mime_charset,
File "/usr/local/lib/python3.7/site-packages/airflow/utils/email.py", line 157, in build_mime_message
with open(fname, "rb") as file:
FileNotFoundError: [Errno 2] No such file or directory: '['
[2022-09-18, 17:24:22 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 1
[2022-09-18, 17:24:23 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator
import os
from datetime import datetime, timedelta
default_args = {
"owner": 'TEST',
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
with DAG(
dag_id="test9_email_operator_dag",
default_args=default_args,
start_date=datetime(2022, 9, 14),
end_date=datetime(2022, 9, 15),
catchup=True,
max_active_runs=1,
schedule_interval="0 12 * * *", # Runs every day @ 8AM EST
) as dag:
def print_local_folder_files(local_temp_folder):
print("local folder files => ", os.listdir(local_temp_folder))
files_list = []
for file in os.listdir(local_temp_folder):
files_list.append(local_temp_folder file)
print("files_list => ", files_list)
return files_list
print_local_folder_files = PythonOperator(
task_id='print_local_folder_files',
python_callable=print_local_folder_files,
op_kwargs={'local_temp_folder': "/usr/local/airflow/dags/temp_dir/"},
do_xcom_push=True,
provide_context=True,
dag=dag)
send_email = EmailOperator(
task_id='send_email',
to='[email protected]',
subject='Test Email op Notification',
html_content='Test email op notification email. ',
files="{{ task_instance.xcom_pull(task_ids='print_local_folder_files') }}"
)
print_local_folder_files >> send_email
CodePudding user response:
You pushed a list to Xcom but Xcoms are rendered as string by default so what you have there is a string representation of list. This is why when you try to read it you get the first char because when you iterate over a string you get it char by char.
To solve your issue you should set render_template_as_native_obj=True
on the DAG object:
with DAG(
dag_id="test9_email_operator_dag",
...,
render_template_as_native_obj=True,
) as dag:
This will let Jinja engine know that you expect to render as native Python types so you will get a list rather than a string. For more information check Airflow docs on this feature.