My example DAG:
Task1-->Task2-->Task3
I have a pipeline with a BashOperator
task that should not stop (at least for a few hours).
Task1: It watches a folder for zip files and extracts them to another folder
#!/bin/bash
inotifywait -m /<path> -e create -e moved_to|
while read dir action file; do
echo "The file '$file' appeared in directory '$dir' via '$action'"
unzip -o -q "/<path>/$file" "*.csv" -d /<output_path>
rm path/$file
done
Task2: PythonOperator(loads the CSV into MySQL database after cleaning)
The problem is that my task is always running due to the loop, and I want it to proceed to the next task after (execution_date x hours).
I was thinking of changing the trigger rules of the downstream task.I have tried the execution_timeout
in BashOperator
but the task shows as failed on the graph.
What should be my approach to solve this kind of problem?
CodePudding user response:
There are several ways to address the issue you are facing.
Option 1: Use execution_time
on parent task and trigger_rule='all_done'
on child task. This is basically what you suggested but just for clarifications - Airflow doesn't mind that one of the task in the pipeline has failed. In your use case you describe it as a valid state for the task so it's OK but not very intuitive as people often associate failed with something that is wrong so it's understandable that this is not the preferred solution.
Option 2: Airflow has AirflowSkipException
. You can set timer in your python code. If timer exceed the time you defined then do:
from airflow.exceptions import AirflowSkipException
raise AirflowSkipException(f"Snap. Time is OUT")
This will set parent task to status Skipped
then the child task can use trigger_rule='none_failed'
. In this way if parent task fails it's due to an actual failure (but not timeout). Valid execution will yield either success status or skipped.