I'm trying out Airflow with a basic ETL, I've put the .py in dags folder and hit up my docker composer, looking at the logs in Airflow it says "FileNotFoundError: [Errno 2] No such file or directory: 'Data.csv'".
ETLwithDBAirflow.py
import sqlalchemy
import pandas as pd
from datetime import datetime
from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
@task()
def get_src_tables():
df = pd.read_csv('Data.csv')
@task()
def transform_src():
df = df.fillna(df.mean(numeric_only=True))
df=df.round()
@task()
def load_src():
engine = sqlalchemy.create_engine("postgresql psycopg2://postgres:root@localhost/testdb")
df.to_sql(
name='Data',
con=engine,
index=False,
if_exists='append'
)
# [START how_to_task_group]
with DAG(dag_id="data_etl_dag",schedule="@daily", start_date=datetime(2023, 1, 26),catchup=False, tags=["data_model"]) as dag:
with TaskGroup("extract_model", tooltip="Extract and load source data") as extract_model:
get_src_tables = get_src_tables()
get_src_tables
with TaskGroup("transform_model", tooltip="Transform and stage data") as transform_model:
transform_src = transform_src()
transform_src
with TaskGroup("load_model", tooltip="Load the data into the database") as load_model:
load_src = load_src()
load_src
extract_model >> transform_model >> load_model
I've tried putting 'return' in each def and 'return df' in 1st def but it didn't work, could I get some help on this? and any suggestions to improve my code? Thank you beforehand. This is my first program trying to write Airflow dags with a very basic ETL, next program I think it's more efficient to have the dags code in a separate .py
CodePudding user response:
So there's a two fold problem here. Any information you want to pass between tasks, referred to as xcom data, does need to be returned within the task, so go ahead and add those returns back in.
The second issue, you also have to explicitly load data from the xcom, as the variables are not available between tasks. With the Task Decorator style this is simply handled as parameters in your task definitions. Here is some docs with examples, specifically the second code block: https://docs.astronomer.io/learn/airflow-decorators
The xcom might automatically serialize the pandas dataframe, so you will probably need to explicitly load the xcom as a dataframe.
Welcome to Airflow!