Home > Net >  Unable to make prediction with Sklearn model on pyspark dataframe
Unable to make prediction with Sklearn model on pyspark dataframe

Time:03-18

I have loaded sklearn model successfully but unable to make predictions on pyspark dataframe. While running the below given code, getting an error mentioned below. Please help me to get the code to make predictions with sklearn model on pyspark. I also have searched relevant questions but could not find the solution.

sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
braodcast_model.value


#update prediction method
def predictor(cols):
    #call predict method for model
    return model.value.predict(*cols)

udf_predictor = udf(predictor, FloatType())

#apply the udf to dataframe
df_prediction = df.withColumn("prediction", udf_predictor(df.select(list_of_columns)))

I get the following error message

TypeError: Invalid argument, not a string or column. For column literals, use 'lit', 'array',
'struct' or 'create_map' function.

CodePudding user response:

I think you were on the right track for reaching your expected output.
I managed to find two possible solutions for such problem: one uses Spark UDF, the other uses Pandas UDF.


Spark UDF

from pyspark.sql.functions import udf

@udf('integer')
def predict_udf(*cols):
    return int(braodcast_model.value.predict((cols,)))

list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))

Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('integer')
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(braodcast_model.value.predict(X))

list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))

Reproducible example

Here I used a Databricks Community cluster with Spark 3.1.2, pandas==1.2.4 and pyarrow==4.0.0.
broadcasted_model is a simple logistic regression from scikit-learn, trained on the breast cancer dataset.

import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf


# load dataset
X, y = load_breast_cancer(return_X_y=True, as_frame=True)

# split in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)

# create a small pipeline with standardization and model
pipe = make_pipeline(StandardScaler(), LogisticRegression())

# save and reload the model
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)

# sample of unseen data
df = spark.createDataFrame(X_test.sample(50, random_state=42))

# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)

Then I used the two methods illustrated above and you will see that the outputs df_prediction will be the same in both cases.

  • Related