Have been searching for this issue, but all the answers are old and not relevant to my issue.
I am trying to pass a sql
parameter as a Jinja Template, but not able to. I am a bit new to Airflow, here is the code, I am not sure if the Jinja templating works in PostgresOperator or not. If there is any other way we can pass please let me know, again I am not trying to pass the jinja template inside the SQL query or file, I am passing the name of the SQL file using the Jinja template which it is not able to render.
PostgerOperatorCode:
write_to_postgres = PostgresOperator(
task_id ="write_to_postgres",
postgres_conn_id="mypostgres",
#TODO investigate, why this is not working
sql ="queries{{ execution_date.hour }}{{ execution_date.day }}.sql"
)
The templating is working on BashOperator, and PythonOpertor, but not in Postgres. Airflow Webserver in UI says, not able to render templates.
BashOperatorCode:
extract_gz = BashOperator(
task_id="extract_downloaded_gz",
bash_command="gunzip --force /opt/airflow/dags/wikipageviews{{ execution_date.hour }}{{ execution_date.day }}.gz"
)
For BashOperator for the same code it works:
UPDATE:
One thing I noticed weird is when I do not put .sql in the sql
parameter, the template gets rendered properly, which seems weird. Here is the screenshot with Jinja Template rendered without the .sql extension in sql
parameter, but it works in bash operator as you can seen in the above code which uses .sql at end.
write_to_postgres = PostgresOperator(
task_id ="write_to_postgres",
postgres_conn_id="mypostgres",
#TODO investigate, why this is not working
sql="queries{{ execution_date.hour }}{{ execution_date.day }}"
)
CodePudding user response:
If the sql
parameter string ends with .sql
then you must pass it a path to a .sql
file that contains the template. It will also take a list of paths. If you just want to pass the query itself in the string then it can’t end with .sql
.
If you want to use these templated fields to build the name of the files, I’d use a PythonOperator
with PostgresHook
. Get the context and build the filenames, and use the hook to execute the queries.
CodePudding user response:
This is one potential way we can do it, and for me, it is working.
def _write_to_postgres(execution_date):
hour = execution_date.hour
day = execution_date.day
filename = f"queries{hour}{day}.sql"
import psycopg2
try:
conn = psycopg2.connect(database="airflow",user='airflow', password='airflow', host='postgres', port= '5432')
except:
raise AirflowFailException
else:
cursor = conn.cursor()
with open(f"/opt/airflow/dags/{filename}","r") as f:
for statement in f.readlines():
print(statement)
cursor.execute(statement)
conn.commit()
cursor.close()
``