I have loaded sklearn model successfully but unable to make predictions on pyspark dataframe. While running the below given code, getting an error mentioned below. Please help me to get the code to make predictions with sklearn model on pyspark. I also have searched relevant questions but could not find the solution.
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
braodcast_model.value
#update prediction method
def predictor(cols):
#call predict method for model
return model.value.predict(*cols)
udf_predictor = udf(predictor, FloatType())
#apply the udf to dataframe
df_prediction = df.withColumn("prediction", udf_predictor(df.select(list_of_columns)))
I get the following error message
TypeError: Invalid argument, not a string or column. For column literals, use 'lit', 'array',
'struct' or 'create_map' function.
CodePudding user response:
I think you were on the right track for reaching your expected output.
I managed to find two possible solutions for such problem: one uses Spark UDF, the other uses Pandas UDF.
Spark UDF
from pyspark.sql.functions import udf
@udf('integer')
def predict_udf(*cols):
return int(braodcast_model.value.predict((cols,)))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))
Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('integer')
def predict_pandas_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(braodcast_model.value.predict(X))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))
Reproducible example
Here I used a Databricks Community cluster with Spark 3.1.2
, pandas==1.2.4
and pyarrow==4.0.0
.
broadcasted_model
is a simple logistic regression from scikit-learn, trained on the breast cancer dataset.
import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf
# load dataset
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
# split in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)
# create a small pipeline with standardization and model
pipe = make_pipeline(StandardScaler(), LogisticRegression())
# save and reload the model
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)
# sample of unseen data
df = spark.createDataFrame(X_test.sample(50, random_state=42))
# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
Then I used the two methods illustrated above and you will see that the outputs df_prediction
will be the same in both cases.