Home > OS >  airflow kubernetes spark job submission retry task group and not sensor only
airflow kubernetes spark job submission retry task group and not sensor only

Time:09-03

Airflow 2.3.3

I have Dag with two TaskGroups. Each TaskGroup has two tasks:

t1: SparkKubernetesOperator >> t2: SparkKubernetesSensor

t1 submits spark job into kubernetes cluster using spark operator deployment yaml file. it goes into dark green SUCCESS state instantly.

t2 monitors the execution of t1. if spark job is Running then it takes ~10min for completion and then t2 goes into Success status.

I have the situation then submited spark job gets ERROR: UnknownHostException and this is when I want to retry but I want to retry whole TaskGroup and not only t2.

I now it is not possible to retry whole TaskGroup.

How to correctly retry and submit spark job into k8s task through airflow 2.3.3?

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from alerts import slack_alert


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'max_active_runs': 1,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
    'on_failure_callback': slack_alert,
}


with DAG(
    "some-dag-name", 
    default_args=default_args,
    description='submit some-dag-name',
    schedule_interval="30 4 * * *",
    start_date = datetime(2022, 8, 27),
    ) as dag:

    with TaskGroup("tg-some-task-name", default_args=default_args,) as tx_some_task_name:
        task_some_task_name = SparkKubernetesOperator(
            task_id='some-task-name',
            namespace="batch",
            application_file="k8s/some-task-name.yaml",
            do_xcom_push=True,
            dag=dag,
        )

        task_some_task_name_sensor = SparkKubernetesSensor(
            task_id='some-task-name-sensor',
            namespace="batch",
            application_name="{{ task_instance.xcom_pull(task_ids='tg-some-task-name.some-task-name')['metadata']['name'] }}",
            kubernetes_conn_id="kubernetes_default",
            dag=dag,
            retries=1,
            attach_log=True,
        )
        
        task_some_task_name >> task_some_task_name_sensor
        
        
    with TaskGroup("tg-some-other-task", default_args=default_args,) as tx_some_other_task:
        task_some_other_task = SparkKubernetesOperator(
            task_id='some-other-task',
            namespace="batch",
            application_file="k8s/some-other-task.yaml",
            do_xcom_push=True,
            dag=dag,
        )

        task_some_other_task_sensor = SparkKubernetesSensor(
            task_id='some-other-task-sensor',
            namespace="batch",
            application_name="{{ task_instance.xcom_pull(task_ids='tg-some-other-task.some-other-task')['metadata']['name'] }}",
            kubernetes_conn_id="kubernetes_default",
            dag=dag,
            retries=1,
            attach_log=True,
        )
        
        task_some_task_name_sensor >> task_some_other_task
    
    chain(task_some_other_task, task_some_other_task_sensor)

CodePudding user response:

Airflow TaskGroup doesn't support retry, so you cannot retry t1 when t2 fails if they are in the same TaskGroup.

But there is another component more suitable for your use case, which is SubDag, it's deprecated but still available in the last version, I think it will be removed once they add its features to TaskGroup (like the retry).

With SubDag, you can run a separate dag and configure its retry and conf, it will be visible in the graph of your main dag exactly like a TaskGroup. So you need just to create new dag contains your tasks t1 and t2, then replace the TaskGroup by a task instance of SubDagOperator which run this dag.

CodePudding user response:

this is the concept:

sensor exposes callback which calls a function to which parameters are passed with defined upstream tasks to be cleared on retry.

sensor fails when retry value is reached.

utils.py

from airflow.models import taskinstance
from airflow.utils.db import provide_session


@provide_session
def clear_tasks(tis, session=None, activate_dag_runs=False, dag=None) -> None:
    taskinstance.clear_task_instances(
        tis=tis,
        session=session,
        activate_dag_runs=activate_dag_runs,
        dag=dag,
    )


def clear_upstream_task(context):
    tasks_to_clear = context["params"].get("tasks_to_clear", [])
    all_tasks = context["dag_run"].get_task_instances()
    tasks_to_clear = [ti for ti in all_tasks if ti.task_id in tasks_to_clear]
    clear_tasks(tasks_to_clear, dag=context["dag"])

then the task group:

from utils.callback_util import clear_upstream_task

with TaskGroup("tg-task", default_args=default_args) as some_task:
    task1 = SparkKubernetesOperator(
        task_id='task1',
        namespace="batch",
        application_file="k8s/task1.yaml",
        do_xcom_push=True,
        dag=dag,
    )

    task_proxy_tx_1d_parsed_sensor = SparkKubernetesSensor(
        task_id='task1-sensor',
        namespace="batch",
        application_name="{{ task_instance.xcom_pull(task_ids='tg-task.task1')['metadata']['name'] }}",
        kubernetes_conn_id="kubernetes_default",
        dag=dag,
        attach_log=True,
        params={"tasks_to_clear": ["tg-task.task1"]},
        on_retry_callback=clear_upstream_task
    )

    task1 >> task1_sensor
  • Related