Home > Enterprise >  Passing Jinja variables in airflow params dict
Passing Jinja variables in airflow params dict

Time:06-28

I am trying to use templated params dict in SnwoflakeOperator. I have 2 tasks one to create a S3 object and then delete and replace the content of the table with s3 object content

execution_date = "{{ execution_date.int_timestamp | int }}"  # built in macro
task1 = PythonOperator(task_id='s3_create', 
          op_kwargs={'s3_key': f'{execution_date}_{file}'})

task_2 = SnowflakeOperator(task_id='load_data', 
            sql=["""copy into {{params.table}} from {{ params.stage }} 
                    files = ('{{ params.files }}') 
                    file_format = 'csv'
                 """]
            params={'table':'test_table', 
                    'files'=f'{execution_date}_{file}'}
)
task_1 >> task_2

It is not rendering as expected and have tried almost all formatting combinations.

Note: I cannot use S3ToSnowflakeOperator as I have do more than just copying the contents into the table.

CodePudding user response:

sql is enter image description here

I'm not sure this is the file name you wanted but you can change execution_date to any other macro you wish.

Edit: You can replace {{ execution_date }} with {{ execution_date.int_timestamp | int }} it will produce: enter image description here

  • Related