Home > OS >  How to template outside an operator or python callable function
How to template outside an operator or python callable function

Time:08-18

I need to find a way to pull from xcom outside an operator.

Currently, I have a list being set and retrieved from the environment variable but im expecting lots of jobs coming through so i foresee a situation where different jobs are setting and retrieving from the same list which may cause an issue.

Ideally i just want to use my list without having to set as an environment variable and ive tried the below iterations. Any help on how to actually do this?

MWAA == Airflow 2.2.2

dag = DAG(
    dag_id='example_batch_submit_job',
    schedule_interval=None,
    start_date=datetime(2022, 8, 17),
    tags=['batch_job'],
    catchup=False)

def get_inputs(**kwargs):
    job_ids = kwargs['dag_run'].conf['job_ids']       # list passed on from api conf
    ti = kwargs['ti']
    ti.xcom_push(key='job_ids', value=job_ids)
    return job_ids

run_this = PythonOperator(
    task_id='get_input',
    provide_context=True,
    python_callable=get_inputs,
    dag=DAG,
)

job_ids = '{{ti.xcom_pull(task_ids="get_inputs", key="job_ids")}}' # <------- Tried this
job_ids = run_this.output                                          # <------- Also tried this.
job_ids = "{{ dag_run.conf['job_id'] }}"                           # <------- or straight from the conf

for id in job_ids:
    submit_batch_job = BatchOperator(
        task_id=f'submit_batch_job_{id}',
        job_name=JOB_NAME,
        job_queue=JOB_QUEUE,
        job_definition=JOB_DEFINITION,
        parameters={}
    )

CodePudding user response:

Templating happens as part of task execution thus you can not template if task is not invoked.

What you are looking for seems to be the ability to create tasks based on previous task output for that you need Dynamic Task Mapping but this is a feature added in Airflow 2.3.0

You can not achieve that in earlier Airflow versions.

  • Related