Home > OS >  How to skip a always running task in airflow after few hours?
How to skip a always running task in airflow after few hours?

Time:06-22

My example DAG:

Task1-->Task2-->Task3

I have a pipeline with a BashOperator task that should not stop (at least for a few hours).

Task1: It watches a folder for zip files and extracts them to another folder

#!/bin/bash
inotifywait -m /<path> -e create -e moved_to|
    while read dir action file; do
        echo "The file '$file' appeared in directory '$dir' via '$action'"
        unzip -o -q "/<path>/$file" "*.csv" -d /<output_path>
        rm path/$file
    done 

Task2: PythonOperator(loads the CSV into MySQL database after cleaning)

The problem is that my task is always running due to the loop, and I want it to proceed to the next task after (execution_date x hours).

I was thinking of changing the trigger rules of the downstream task.I have tried the execution_timeout in BashOperator but the task shows as failed on the graph.

What should be my approach to solve this kind of problem?

CodePudding user response:

There are several ways to address the issue you are facing.

Option 1: Use execution_time on parent task and trigger_rule='all_done' on child task. This is basically what you suggested but just for clarifications - Airflow doesn't mind that one of the task in the pipeline has failed. In your use case you describe it as a valid state for the task so it's OK but not very intuitive as people often associate failed with something that is wrong so it's understandable that this is not the preferred solution.

Option 2: Airflow has AirflowSkipException. You can set timer in your python code. If timer exceed the time you defined then do:

from airflow.exceptions import AirflowSkipException
raise AirflowSkipException(f"Snap. Time is OUT")

This will set parent task to status Skipped then the child task can use trigger_rule='none_failed'. In this way if parent task fails it's due to an actual failure (but not timeout). Valid execution will yield either success status or skipped.

  • Related