Home > Mobile >  airflow 2.3.3 SparkKubernetesOperator with SparkKubernetesSensor in TaskGroup
airflow 2.3.3 SparkKubernetesOperator with SparkKubernetesSensor in TaskGroup

Time:09-01

I have SparkKubernetesOperator >> SparkKubernetesSensor dependency. It works fine when outside of TaskGroup but it does not work when put within TaskGroup because it complains about metadata field.

What am I doing group.

with TaskGroup("tg-task-1", default_args=default_args) as tg_task_1:
    task_1 = SparkKubernetesOperator(
                task_id='task-1',
                namespace="batch",
                application_file="k8s/task-1.yaml",
                do_xcom_push=True,
                dag=dag,
            )

    task_1_sensor = SparkKubernetesSensor(
        task_id='task-1-sensor',
        namespace="batch",
        application_name="{{ task_instance.xcom_pull(task_ids='task-1')['metadata']['name'] }}",
        kubernetes_conn_id="kubernetes_default",
        dag=dag,
        attach_log=True,
    )

this is the error I get

jinja2.exceptions.UndefinedError: 'None' has no attribute 'metadata'

CodePudding user response:

I have just realised the task_ids differs when tasks are defined within TaskGroup. So, the tasks_ids shoul be:

application_name="{{ task_instance.xcom_pull(task_ids='tg-task-1.task-1')['metadata']['name'] }}",
  • Related