Home > Software design >  How to pass pandas dataframe to airflow tasks
How to pass pandas dataframe to airflow tasks

Time:11-10

I'm learning how to use airflow to build machine learning pipeline.

But didn't find a way to pass pandas dataframe generated from 1 task into another task... It seems that need to convert the data to JSON format or save the data in database within each task?

Finally, I had to put everything in 1 task... Is there anyway to pass dataframe between airflow tasks?

Here's my code:

from datetime import datetime
import pandas as pd
import numpy as np
import os

import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score

from airflow.decorators import dag, task
from airflow.operators.python_operator import PythonOperator


@dag(dag_id='super_mini_pipeline', schedule_interval=None, 
 start_date=datetime(2021, 11, 5), catchup=False, tags=['ml_pipeline'])
def baseline_pipeline():

    def all_in_one(label):
        path_to_csv = os.path.join('~/airflow/data','leaf.csv') 
        df = pd.read_csv(path_to_csv)

        y = df[label]
        X = df.drop(label, axis=1)

        folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
        lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
        metrics_lst = []

        for train_idx, val_idx in folds.split(X, y):
            X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
            X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
        
            lgbm.fit(X_train, y_train)
            y_pred = lgbm.predict(X_val)
        
            cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
            metrics_lst.append(cv_balanced_accuracy)
    
        avg_performance = np.mean(metrics_lst)

        print(f"Avg Performance: {avg_performance}")


    all_in_one_task = PythonOperator(task_id='all_in_one_task', python_callable=all_in_one, op_kwargs={'label':'species'})
    all_in_one_task 


# dag invocation
pipeline_dag = baseline_pipeline()

CodePudding user response:

Although it is used in many ETL tasks, Airflow is not the right choice for that kind of operations, it is intended for workflow not dataflow. But there are many ways to do that without passing the whole dataframe between tasks.

You can pass information about the data using xcom.push and xcom.pull:

a. Save the outcome of the first task somewhere (json, csv, etc.)

b. Pass to xcom.push information about saved file. E.g. file name, path.

c. Read this filename using xcom.pull from the other task and perform needed operation.

Or:

Everything above using some database tables:

a. In task_1 you can download data from table_1 in some dataframe, process it and save in another table_2 (df.to_sql()).

b. Pass the name of the table using xcom.push.

c. From the other task get table_2 using xcom.pull and read it with df.read_sql().

Information on how to use xcom you can get from airflow examples. Example: https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py

IMHO there are many other better ways, I have just written what I tried.

CodePudding user response:

Completely agree with @Talgat that Airflow is not really built for this. It focuses on task-dependencies rather than data-dependencies.

Perhaps you can look at a data-focused pipeline-ing solution like ZenML to solve this problem? It has a guide with examples off passing Pandas Dataframes across pipeline steps. You can also leverage data caching across steps and other features that make it more suited to what you're doing.

On top, a ZenML pipeline is also deploy-able as an Airflow DAG. So rather than focusing on writing the persisting of artifact logic yourself, you can just let ZenML handle it.

Disclaimer: I am one of the core contributors of ZenML, so this is admittedly biased. Still thought it might be helpful for the OP!

  • Related