We are using Airflow 2.1.4 and running on Kubernetes.
We have separated pods for web-server, scheduler and we are using Kubernetes executors.
We are using variety of operator such as PythonOperator
, KubernetesPodOperator
etc.
Our setup handles ~2K customers (businesses) and each one of them has it's own DAG.
Our code looks something like:
def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)
customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag
In the snippet above is a simplified version of what we've got, but we have a few dozens of operators in the DAG (and not just three).
The problem is that for each one of the operators in each one of the DAGs we see the querying database to get all customers
log - which means that we query the database a way more than we want to.
The database doesn't updated frequently and we can update the DAGs only once-twice a day. I know that the DAGs are being saved in the metadata database or something..
- Is there a way to build those DAGs only one time / via scheduler and not to do that per operator?
- Should we change the design to support our multi-tenancy requirement? Is there a better option than that?
In our case, ~60 operators X ~2,000 customers = ~120,000 queries to the database.
CodePudding user response:
Yes this is entirely expected. The DAGs are parsed by Airflow regularly (evey 30 second by default) so any top-level code (the one that is executed during parsing the file rather than "execute" methods of operators) is executed then.
Simple answer (and best practice) is "do not use any heavy operations in the top-level code of your DAGs". Specifically do not use DB queries. But if you want some more specific answers and possible ways how you can handle it, there are dedicated chapters about it in Airflow documentation about best practices:
This is explanation why Top-Level code should be "light" https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
This one is about strategies you might use to avoid "heavy" operations in Top-level code when you do dynamic DAG generation as you do in your case: https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#dynamic-dag-generation
In short there are three proposed ways:
- using env variables
- generating a configuration file (for example .json) from your DB automatically (periodically) by an external script and putting it next to your DAG and reading the json file by your DAG from there rather than using sql query.
- generating many DAG python files dynamically (for exmple using JINJA) also automatically and periodically using an external script.
You could use either 2) or 3) to achive your goal I believe.