Home > OS >  Construct list using jinja2 for KubernetesPodOperator in Airflow
Construct list using jinja2 for KubernetesPodOperator in Airflow

Time:10-20

we have an application running in a pod I want to trigger with airflow. The application runs with a lot of entities and take a lot of time. The nature of our setup is that some of these might fail, and we want to be able to re-run using only one or a few of the enteties:

my_program # Run full application

my_program -e entity1 -e entity2 # Run application limited to entity1 and entety2.

My plan was to allow users to trigger the DAG again with a list of entities using the "Trigger with config" in the Airflow UI and have that limit the DAG using the {{ dag_run.conf }} options.

The problem I now face is that the KubernetesPodOperator expects a list of strings, and I do not understand how use jinja to construct a list where I before do not know its length.

This is what I tried, but then of course the jinja will not be templated. I understand how I can insert templated strings into the list, but now how I can do it when I do not know then length of the list in advance.

with DAG(
    "my_dag",
    description="Run my dag",
    schedule_interval="@daily",
    start_date=datetime.datetime(2021, 10, 14),
    default_args=default_args,
) as dag:

    entities = """{%- for entity in dag_run.conf['entities'] -%} -p {{ entity }} {% endfor %}"""
    arguments = list(filter(None, ['my_program', *entities.split(' ')]))

    t1 = KubernetesPodOperator(
        task_id="my_task_id",
        image="url_to_docker_image:latest",
        name="my_task_name",
        arguments=arguments,
        is_delete_operator_pod=True,
        env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
    )

Edit: Here is my second attempt using jinja and render_template_as_native_obj=True,

with DAG(
    "my_dag",
    description="Run my dag",
    schedule_interval="@daily",
    start_date=datetime.datetime(2021, 10, 14),
    default_args=default_args,
    render_template_as_native_obj=True,
) as dag:


    arguments = """['my_program', {% if entities is defined %}
      {%- for entity in entities-%} '-p', '{{ entity }}', {% endfor %}
      {%- endif %}]
    """

    t1 = KubernetesPodOperator(
        task_id="my_task_id",
        image="url_to_docker_image:latest",
        name="my_task_name",
        arguments=arguments, # type: ignore
        is_delete_operator_pod=True,
        env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
    )

But this seems to not be converted to a list properly:

HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Args: []string: decode slice: expect [ or n, but found \", error found in #10 byte of ...|{\"args\": \"['my_program|...

CodePudding user response:

The second approach worked with minor tweak. Of course the variable were not available in the example (which had been stripped down) and parameters were fetched from dag_run.conf['entities'] instead of just entities.

The second problem was in what was valid input for jinja to convert to a python object, and I had to remove empty space at the end of the string as well as removing new line characters:

arguments = """['my_program', {% if dag_run.conf['entities'] is defined %}
  {%- for entity in dag_run.conf['entities']-%} '-p', '{{ entity }}', {% endfor %}
  {%- endif %}]
""".replace('\n','').strip()

CodePudding user response:

You are in the right track with your second attempt, but the template in your arguments variable has an extra comma (',') at the end of the last entity.

import jinja2
from jinja2.nativetypes import NativeEnvironment

env = NativeEnvironment()
template = env.from_string(arguments)
print (template.render(entities=range(5)) )

Outputs: ['my_program', '-p', '0', '-p', '1', '-p', '2', '-p', '3', '-p', '4', ]

If you change your arguments variable to this:

arguments = """
   ['my_program' {% if entities is defined %}
   {%- for entity in entities-%}, '-p', '{{ entity }}' {% endfor %}
   {%- endif %}]
   """

The output is now a string that can Jinja can convert to a python array: ['my_program' , '-p', '0' , '-p', '1' , '-p', '2' , '-p', '3' , '-p', '4' ]

  • Related