Requirement:
I am using 3 dataframes as below:
df1 - query result with no of records in the table before data processing
df2 - query result with no of records in the table after data processing
df = merge df1 and df2
Code:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
import pendulum
import pandas as pd
import logging
local_tz = pendulum.timezone("my_time_zone")
local_time = pendulum.now("my_time_zone")
date = local_time.to_date_string()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
###setting default arguments
default_args = {
'owner': 'owner',
'start_date': datetime(2021, 1, 1, tzinfo=local_tz),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
sf_hook=SnowflakeHook(snowflake_conn_id="my_conn_id")
sf_con=sf_hook.get_conn()
df1 = pd.DataFrame() ##empty dataframe
def get_before_load_count(**context):
global df1
query1 = """SELECT 'att1' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE1
UNION ALL
SELECT 'att2' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE2
UNION ALL
SELECT 'att3' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE3
UNION ALL
SELECT 'att4' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE4
UNION ALL
SELECT 'att5' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_BEFORE_LOAD FROM DB.SCHEMA.TABLE5 """
df1= df1.append(pd.read_sql(query1,sf_con),ignore_index=True)
print("df1",df1) ## I am able to see the record counts
def send_success_notification(**context):
global df1
query = """SELECT 'att1' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE1
UNION ALL
SELECT 'att2' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE2
UNION ALL
SELECT 'att3' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE3
UNION ALL
SELECT 'att4' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE4
UNION ALL
SELECT 'att5' AS ATTRIBUTE,
COUNT(*) AS RECORD_COUNT_AFTER_LOAD FROM DB.SCHEMA.TABLE5"""
df2 = pd.read_sql(query,sf_con)
print("df2",df2)
print("my df1",df1)
df3 = pd.merge(df1 , df2 , on = "ATTRIBUTE")
df=df3.reindex(["ATTRIBUTE","RECORD_COUNT_BEFORE_LOAD","RECORD_COUNT_AFTER_LOAD"], axis=1)
print("df",df)
html_table = df.to_html(index=False,justify='center')
op = EmailOperator(task_id='success_email',to='[email protected]',
subject='Email subject ' date,
html_content=" <p>Hi,<br><br>Process Completed<br><br> {}".format(html_table),dag=dag)
op.execute(context)
with DAG('sample_dag', schedule_interval=None, max_active_runs=1, catchup=False,default_args=default_args) as dag:
before_load = PythonOperator(
task_id="before_load",
python_callable=get_before_load_count,
provide_context=True,
dag=dag
)
load_data = SnowflakeOperator(
task_id='load_sample_data',
sql=['CALL procedure_name()'],
snowflake_conn_id='sf_con_id',
database='DB',
schema='SCHEMA',
warehouse = 'WAREHOUSE',
role = 'ROLE',
dag=dag
)
send_success_email = PythonOperator(
task_id="send_success_email",
python_callable=send_success_notification,
provide_context=True,
dag=dag
)
before_load >> load_data >> send_success_email
Issue: When I merge df1 and df2 the code is throwing a Key error stating that the df1 is empty.
Any help is appreciated!
CodePudding user response:
Each task in airflow is run is run in a different process (And possibly on a different machine) so you cannot share data between tasks using variables.
For small amount of meta-data you can use built-in XCom https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
For bigger dataframes you have to store the data on external storage manually (for example export it to GCS/S3) or you can use custom xcom backend (also using some external backing storage).
CodePudding user response:
I would suggest flush data to disk (maybe as a csv file) accessible across network by airflow workers and have your task load it into dataframe. That will help you achieve what you intend easily.