I have SparkKubernetesOperator >> SparkKubernetesSensor dependency. It works fine when outside of TaskGroup but it does not work when put within TaskGroup because it complains about metadata field.
What am I doing group.
with TaskGroup("tg-task-1", default_args=default_args) as tg_task_1:
task_1 = SparkKubernetesOperator(
task_id='task-1',
namespace="batch",
application_file="k8s/task-1.yaml",
do_xcom_push=True,
dag=dag,
)
task_1_sensor = SparkKubernetesSensor(
task_id='task-1-sensor',
namespace="batch",
application_name="{{ task_instance.xcom_pull(task_ids='task-1')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
attach_log=True,
)
this is the error I get
jinja2.exceptions.UndefinedError: 'None' has no attribute 'metadata'
CodePudding user response:
I have just realised the task_ids differs when tasks are defined within TaskGroup. So, the tasks_ids shoul be:
application_name="{{ task_instance.xcom_pull(task_ids='tg-task-1.task-1')['metadata']['name'] }}",