My syntax looks like this, I want the below 3 SQL files in the list to be executed sequentially inside the BigQueryInsertJobOperator class as one task.
This approach only executes the first SQL file, is there an alternative approach to solve this problem?
'''
t1 = BigQueryInsertJobOperator(
task_id='data load',
configuration={
"query": {
"query": "{% include ['sqlfile_1.sql', 'sqlfile_2.sql', 'sqlfile_3.sql']
"useLegacySql": False,
}
},
gcp_conn_id='bq_conn')
'''
CodePudding user response:
I can't seem to find a way to have multiple sql files on one task ID. However, you might want to consider below alternative approach in which I loop the multiples sql files with their respective task ID's for each sql file but belonging to one DAG only.
Please see below sample code:
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
PROJECT_ID = "your-project-id"
DATASET_NAME = "your-dataset"
TABLE_1 = "your-table"
dag_id = "your-dag-id"
sql_files = [
'my-query1.sql',
'my-query2.sql',
'my-query3.sql'
]
with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1},
) as dag:
for sqlrun in sql_files:
my_taskid = sqlrun.split(".")
my_final_taskid = my_taskid[0]
case_june_1 = BigQueryInsertJobOperator(
task_id=my_final_taskid,
configuration={
"query": {
"query": f"{sqlrun}",
"useLegacySql": False,
}
},
)
In addition, per this documentation, query parameter only accepts a STRING and then parse it to read as query.
CodePudding user response:
Not recommended to use multiple sql in one task!
But you can execute multiple sql commands in one round trip
sqlfile_1.sql
SELECT *
FROM table_1
sqlfile_2.sql
SELECT *
FROM table_2
------------ concate SQL file with ; ------->
sqlfiles.sql (add ; in the end of sql)
SELECT *
FROM table_1;
SELECT *
FROM table_2;
(Not sure this way can work for you or not, it can work using Bigquery.)