I have difficulties to understand how the paths in airflow work. I created this repository so that it is easy to see what I mean: https://github.com/remo2479/airflow_example/blob/master/dags/testdag.py I created this repository from scratch according to the manual on the airflow page. I just deactivated the example DAGs.
As you can see in the only DAG (dags/testdag.py) the DAG contains two tasks and one variable declaration using an opened file. The two tasks are using the dummy sql script in the repository (dags/testdag/testscript.sql). One time i used testdag/testscript.sql as path (task 1) and one time dags/testdag/testscript.sql (task 2). With a connection set up task 1 would work and task 2 wouldnt because the template cannot be found. This is how I would expect both tasks to run since the dag is in the dags folder and we should not put it in the path.
But when I try to open the testscript.sql and read its contents it's necessary that I put "dags" in the path (dags/testdag/testscript.sql). Why does the path behave differently when using the MsSqlOperator and the open-function?
For convenience I put the whole script in this post:
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from datetime import datetime
with DAG(
dag_id = "testdag",
schedule_interval="30 6 * * *",
start_date=datetime(2022, 1, 1),
catchup=False) as dag:
# Error because of missing connection - this is how it should be
first_task = MsSqlOperator(
task_id="first_task",
sql="testdag/testscript.sql")
# Error because of template not found
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
# When trying to open the file the path has to contain "dags" in the path - why?
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
first_task
second_task
CodePudding user response:
MsSqlOperator
has sql
as templated field. This means that Jinja engine will run on the string passed via the sql
parameter. Moreover it has .sql
as templated extension. This means that the operator knows to open .sql
file, read it content and pass it via the Jinja engine before submitting it to the MsSQL db for execution. The behavior that you are seeing is part of Airflow power. You don't need to write code to read the query from the file. Airflow does that for you. Airflow asks you just to provide the query string and the connection - The rest is on the Operator to handle.
The:
second_task = MsSqlOperator(
task_id="second_task",
sql="dags/testdag/testscript.sql")
is throwing template not found error since Airflow knows to look for template extensions in paths relative to your DAG. This path is not relative to your DAG. If you want this path to be available then use template_searchpath as:
with DAG(
...,
template_searchpath=["dags/testdag/"],
) as dag:
Then your operator can just have sql=testscript.sql
As for the:
with open("dags/testdag/testscript.sql","r") as file:
f = file.read()
file.close()
This practically do nothing. The file will be opened and read from the scheduler as this is a top level code. Not only that - these lines will be executed every 30 seconds (default of min_file_process_interval as Airflow periodically scans your .py
file searching for DAG updates. This should also answer your question why dags/
is needed.
CodePudding user response:
Using the template_searchpath
will work as @Elad has mentioned, but this is DAG-specific.
To find files in Airflow without using template_searchpath
, remember that everything Airflow runs starts in the $AIRFLOW_HOME directory (i.e. airflow
by default, or wherever you're executing the services from). So either start there with all your imports, or reference them in relation to the code file you're currently in (i.e. current_dir
from my previous answer).
Setting Airflow up for the first time can be fiddly.