I'm trying to run my python project using airflow 2.0.1 in docker.
So here is catalog structure:
dags are here: path_to_airflow/dags/ python project code is here: path_to_airflow/dags/utils
I faced strange airflow attitude dealing with exceptions: my PythonOperator task is always finishing with success mark and exit code 0 status no matter is there any exception or not. Can somebody help my with solving this issue?
Here is dag code:
from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from utils.main import main
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'test',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1, 0, 0),
max_active_runs=1,
catchup=False,
)
task_dummy = DummyOperator(
task_id='task_dummy',
dag=dag,
)
task_1 = PythonOperator(
task_id='task_1',
python_callable=main,
dag=dag,
)
task_dummy >> task_1
Here is main function code:
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from airflow.exceptions import AirflowException
def main(data_path='/opt/airflow/dags/data/'):
...
some code
...
if condition:
raise AirflowException('empty data')
if __name__ == '__main__':
main()
All exceptions cat be catched replacing PythonOperator with BashOperator, but I just want to understand what is wrong. For testing you can just leave exception without condition in a main function. Thank you for your time.
CodePudding user response:
As discussed in the comment, the issue was wrong observation.