Home > Software engineering >  Not able to use Jinja template in PostgresOperator Airflow in SQL file naming
Not able to use Jinja template in PostgresOperator Airflow in SQL file naming

Time:10-17

Have been searching for this issue, but all the answers are old and not relevant to my issue. I am trying to pass a sql parameter as a Jinja Template, but not able to. I am a bit new to Airflow, here is the code, I am not sure if the Jinja templating works in PostgresOperator or not. If there is any other way we can pass please let me know, again I am not trying to pass the jinja template inside the SQL query or file, I am passing the name of the SQL file using the Jinja template which it is not able to render.

PostgerOperatorCode:

write_to_postgres = PostgresOperator(
        task_id ="write_to_postgres",
        postgres_conn_id="mypostgres",
        #TODO investigate, why this is not working
        sql ="queries{{ execution_date.hour }}{{ execution_date.day }}.sql"
    ) 

The templating is working on BashOperator, and PythonOpertor, but not in Postgres. Airflow Webserver in UI says, not able to render templates.

Postgres render

BashOperatorCode:

 extract_gz = BashOperator(
        task_id="extract_downloaded_gz",
        bash_command="gunzip --force /opt/airflow/dags/wikipageviews{{ execution_date.hour }}{{ execution_date.day }}.gz"
    )

For BashOperator for the same code it works: BashOperator

UPDATE: One thing I noticed weird is when I do not put .sql in the sql parameter, the template gets rendered properly, which seems weird. Here is the screenshot with Jinja Template rendered without the .sql extension in sql parameter, but it works in bash operator as you can seen in the above code which uses .sql at end.

  write_to_postgres = PostgresOperator(
        task_id ="write_to_postgres",
        postgres_conn_id="mypostgres",
        #TODO investigate, why this is not working
        sql="queries{{ execution_date.hour }}{{ execution_date.day }}"
) 

OUTPUT of Render render output successfull

CodePudding user response:

If the sql parameter string ends with .sql then you must pass it a path to a .sql file that contains the template. It will also take a list of paths. If you just want to pass the query itself in the string then it can’t end with .sql.

If you want to use these templated fields to build the name of the files, I’d use a PythonOperator with PostgresHook. Get the context and build the filenames, and use the hook to execute the queries.

CodePudding user response:

This is one potential way we can do it, and for me, it is working.

def _write_to_postgres(execution_date):
    hour = execution_date.hour
    day = execution_date.day
    filename = f"queries{hour}{day}.sql"
    import psycopg2
    try:
        conn = psycopg2.connect(database="airflow",user='airflow', password='airflow', host='postgres', port= '5432')
    except:
        raise AirflowFailException
    else:
        cursor = conn.cursor()
        with open(f"/opt/airflow/dags/{filename}","r") as f:
            for statement in f.readlines():
                print(statement)
                cursor.execute(statement)
        conn.commit()
        cursor.close()
``
  • Related