Home > database >  Airflow and Templates reference and PostgresHook
Airflow and Templates reference and PostgresHook

Time:01-19

I have a question I want to use Templates reference - {{ds}} When substituting in PostgresOperator, everything works out well (I guess so) And PostgresHook does not want to work

 def prc_mymys_update(procedure: str, type_agg: str):
    with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
        with conn.cursor() as cur:
            with open(URL_YML_2,"r", encoding="utf-8") as f:
                ya_2 = yaml.safe_load(f)
                yml_mymts_2 = ya_2['type_agg']
                query_pg = ""
                if yml_mymts_2[0]['type_agg_name'] == "day" and type_agg == "day":
                    sql_1 = yml_mymts_2[0]['sql']
                    query_pg = f"""{sql_1}"""
                elif yml_mymts_2[1]['type_agg_name'] == "retention" and type_agg == "retention":
                    sql_2 = yml_mymts_2[1]['sql']
                    query_pg = f"""{sql_2}"""
                elif yml_mymts_2[2]['type_agg_name'] == "mau" and type_agg == "mau":
                    sql_3 = yml_mymts_2[2]['sql']
                    query_pg = f"""{sql_3}"""
                cur.execute(query_pg)
                dates_list = cur.fetchall()
                for date_res in dates_list:
                    cur.execute(
                        "select from {}(%(date)s::date);".format(procedure),
                        {"date": date_res[0].strftime("%Y-%m-%d")},
                    )
    conn.close()

I use yml

type_agg:
  - type_agg_name: day
    sql: select calendar_date from entertainment_dds.v_calendar where calendar_date between '{{ds}}'::date - interval '7 days' and '{{ds}}'::date - 1 order by 1 desc
  - type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
  - type_agg_name: mau
    sql: select dt::date date_ from generate_series('{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '7 days', '{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '1 days', interval '1 days') dt order by 1 asc

And when I run a dag, it comes to a moment with a certain task that uses

- type_agg_name: retention
    sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

I have wrong

psycopg2.errors.UndefinedColumn: column "y" does not exist LINE 1: ...((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}...

enter image description here

I tried to find information on the interaction of Templates reference and PostgresHook, but found nothing

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference

CodePudding user response:

This is expected. templated_fields is an attribute of the BaseOperator in Airflow from which all operators inherit. This is which passing in a Jinja expression when using the PostgresOperator works just fine.

If you need to write a custom task, you need to render the template values explicitly. Like this, untested, but I'm sure this can be extrapolated in your function:

def prc_mymys_update(procedure: str, type_agg: str, ti):
    ti.render_templates()

    with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
        with conn.cursor() as cur:
            ...

The ti kwargs represents the Airflow Task Instance and is directly accessible as part of the execution context pushed to every task in Airflow. That object has a render_templates() method which will translate the Jinja expression to a value.

If the PostgresOperator doesn't fit your needs you can always subclass the operator and tailor it accordingly.

Also, the sql string itself has single quotes which cause string parsing issues as you're seeing: '{{execution_date.strftime('%Y-%m-%d')}}' Should be something like: '{{execution_date.strftime("%Y-%m-%d")}}'

CodePudding user response:

Note the single quotes in the following query:

sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

Specifically, this part:

'{{execution_date.strftime('%Y-%m-%d')}}'

You have two separate strings here, separated by the date format. Here's the first string:

'{{execution_date.strftime('

This causes the date format to be rendered separately. If you wrap the date format in double quotes instead of single quotes, it should resolve this error. For example:

sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime("%Y-%m-%d")}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc

Note that you might need to swap the double and single quotes if double quotes in the RDBMS are used for other purposes, for example:

"{{execution_date.strftime('%Y-%m-%d')}}"
  • Related