Home > Net >  Preprocessing data before inference in Batch Transform SageMaker
Preprocessing data before inference in Batch Transform SageMaker

Time:01-02

Good afternoon,

I am trying to use a recently trained model on SageMaker to do Batch inference. I have the dataset converted in json format. According to the course I am doing, you should have four functions into "serve.py", then the Session() and the model are created to finally feed the data with model.transformer and .transform(...). The data in the json file is not preprocessed as it is trying to mock real life data, while the same data was preprocessed (removed certain columns, onehot encoding and Scaler) in the training step. Therefore when passing the data it needs data processed to match the data used on training. This preprocessing steps were supposed to be included in the input_fn fuction, but when trying to do the inference it received an error:

ValueError( sagemaker_containers._errors.ClientError: X has 37 features, but ColumnTransformer is expecting 16 features as input<

The data in fact has 16 features plus the class column.

I have tried:

  • Changing strategy="MultiRecord" to SingleRecord but the error states that "X has 8 features, but ColumnTransformer is expecting 16"
  • Removing the preprocessing steps from input_fn completely, but as expected it fails when it finds non vectorized data.

The entire code I am using can be found next. Please know that it works fine with Serverless inference

%%writefile serve.py

import os
import joblib
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer

def model_fn(model_dir):
    """Load and return the model"""
    model_file_name = "pipeline_model.joblib"
    pipeline_model = joblib.load(os.path.join(model_dir, model_file_name))
    
    return pipeline_model
      
def input_fn(request_body, request_content_type):
    """Process the input json data and return the processed data.
    You can also add any input data pre-processing in this function
    """
    if request_content_type == "application/json":
        input_object = pd.read_json(request_body, lines=True)
       #print(input_object.shape)
       #print(input_object.head())
        cat_cols = ["job", "marital", "education", "default", "housing", "loan", "month", "poutcome"]
        cont_cols = ["age", "pdays", "previous", "emp_var_rate","cons_price_idx",  "cons_conf_idx",  "euribor3m","nr_employed"]
        input_object.drop(["y"],axis=1, inplace=True)
       #
       #
       ## One hot encode the categorical columns
        ohe = OneHotEncoder(drop="first")
       ## Scale the continuous columns
        sc = StandardScaler()

       ## Column transformer to apply transformations on both categorical and continuous columns
        ct = ColumnTransformer([
           ("One Hot Encoding", ohe, cat_cols),
           ("Scaling", sc, cont_cols)
        ])
       #      
       ###correct
        input_object = ct.fit_transform(input_object)
        
        return input_object
    else:
        raise ValueError("Only application/json content type supported!")  

def predict_fn(input_object, pipeline_model):
    """Make predictions on processed input data"""
    predictions = pipeline_model.predict(input_object)
    pred_probs = pipeline_model.predict_proba(input_object)
    
    prediction_object = pd.DataFrame(
        {
            "prediction": predictions.tolist(),
            "pred_prob_class0": pred_probs[:, 0].tolist(),
            "pred_prob_class1": pred_probs[:, 1].tolist()
        }
    )
    
    return prediction_object

def output_fn(prediction_object, request_content_type):
    """Post process the predictions and return as json"""
    return_object = prediction_object.to_json(orient="records", lines=True)
    
    return return_object

# Create the deployment - same as Real Time Inference Code!
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import Session, get_execution_role

session = Session()
bucket = session.default_bucket()

training_job_name = "rfc-pipeline-tuner-221223-1512-009-d5b7f868" # TODO: Update with best TrainingJobName from hyperparameter tuning
model_artifact = f"s3://{bucket}/{training_job_name}/output/model.tar.gz"
endpoint_name = "bank-prediction-rfc-pipeline-batch-transform"

model = SKLearnModel(
    name=endpoint_name,
    framework_version="1.0-1",
    entry_point="serve.py",
    dependencies=["requirements.txt"],
    model_data=model_artifact,
    role=get_execution_role(),
    sagemaker_session = session
)

# Create a batch transformer from the base model
output_path = f"s3://{bucket}/sagemaker/bank-prediction/test_preds"
batch_transformer = model.transformer(instance_count=1, 
                                      instance_type="ml.m5.xlarge",
                                      strategy="MultiRecord",
                                      accept="application/json",
                                      assemble_with="Line", 
                                      output_path=output_path)
%%time
# Feed the test data
test_data_path = "s3://sagemaker-eu-west-2-262713471428/sagemaker/bank-prediction1/bigtest.json" 
batch_transformer.transform(test_data_path, data_type="S3Prefix", content_type="application/json", split_type="Line")

CodePudding user response:

So the issue was that one of the columns had an extra category not seen during training and the model was not able to vectorize it.

The data is fed without processing and the model on SageMaker performs the vectorization and the transformation necessary based on the training process so that it fits the trained model. No need to add preprocessing steps such as Standarization and OneHotEncoder in the inference step as I was trying to do. I solved my issue by removing the row containing the extra category (which was a simple yes) and feeding the data and removing the processing steps from input_fn.

As per the error on this post, it says 37 features when I was feeding only 16. Well, this is because the ML model in SageMaker processed the raw data and OneHotEncoded it creating more columns. Then, it encountered the processing steps I put into input_fn and tried to do it again.

I was using SageMaker with SKLearn models, it might be different with other models.

  • Related