Home > Net >  In Palantir Foundry, can I find which CSV file is causing schema errors in a dataset?
In Palantir Foundry, can I find which CSV file is causing schema errors in a dataset?

Time:06-23

I'm seeing errors like the following when building downstream of some datasets containing CSV files:

Caused by: java.lang.IllegalStateException: Header specifies 185 column types but line split into 174: "SUSPECT STRING","123...

or

Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Exception parsing 'SUSPECT STRING' into a IntegerType$ for column "COLOUR_ID": Unable to deserialize value using com.palantir.spark.parsers.text.converters.IntegerConverter. The value being deserialized was: SUSPECT STRING

Looking at the errors it seems to me like some of my CSV files have the wrong schema. How can I find which ones?

CodePudding user response:

One technique you could use would be to:

  1. create a transform that reads the CSV files in as if they were unstructured text files, then
  2. filter the resulting DataFrame down to just the suspect rows, as identified by the extracts contained in the error message

Below is an example of such a transform:

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = (
            spark_session.read.text(file_name)
            .filter(F.col("value").contains(F.lit("SUSPECT STRING")))
            .withColumn("_filename", F.lit(file_name))
        )
        parsed_dfs  = [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    output_dataset=Output("my_output"),
    input_dataset=Input("my_input"),
)
def compute(ctx, input_dataset, output_dataset):
    session = ctx.spark_session
    input_filesystem = input_dataset.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path   "/"   file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    output_dataset.write_dataframe(output_df)

This would then output the rows of interest along with the paths to the files they're in.

  • Related