Home > Enterprise >  Reading json file with corrupt_record in spark java
Reading json file with corrupt_record in spark java

Time:11-21

I'm working with spark java application with spark version 2.7. I'm trying to load a multiline JSON file that might have corrupted records according to my schema. I'm passing a schema while loading it, but the issue is that it rejects the entire file as one corrupted record even if there's one JSON object which is not satisfying the schema I'm providing.

My Json file looks something like-

[
{Json_object},
{Json_object},
{Json_object}
]

I manually created schema(of StructType) for it and loading it like -

Dataset<Row> df = spark.read().option("multiline", "true").option("mode","PERMISSIVE").option("columnNameOfCorruptRecord","_corrupt_record").schema(schema).json("filepath");

The issue is that even if one JSON object doesn't follow the schema, for instance if attribute1 in my schema has integer type and it is in form of string for one of the json object then json object should get inside corrupted_record, insted I'm getting something like-

 ------------ --------------- --------------- 
| attribute1 |   attribute2  |_corrupt_record|
 ------------ --------------- --------------- 
|    null    |     null      |             [{|
|            |               | all_json_obj  |
|            |               |          ...  |
|            |               |         }]    |
 ------------ --------------- --------------- 

And it works absolutely fine with typical single line json objects where newline character '\n' is used as a delimiter, no issues facing in that and ideal results. Can somebody tell me what am I missing here?

PS: The question is not limited to spark java, the behavior is same in scala and python as well.

CodePudding user response:

By looking at your output, which I'm going to replicate here:

 ------------ --------------- --------------- 
| attribute1 |   attribute2  |_corrupt_record|
 ------------ --------------- --------------- 
|    null    |     null      |             [{|
|            |               | all_json_obj  |
|            |               |          ...  |
|            |               |         }]    |
 ------------ --------------- --------------- 

If you look at the first and the last row, you see that the corrupt_records are [{ and }]. This makes me think that possibly those { and } characters should not be there. Is it possible that your json file is actually something like:

[{
{Json_object},
{Json_object},
{Json_object}
}]

If that is the case, then those {} curly braces right between the highest level [] square braces will make it seem that the highest level array only contains 1 object, with the wrong schema. If that is the case, could you try to remove those curly braces right between the square brackets of your array?

Just to give you a functioning example, consider the following json file:

[
    {
        "id": 1,
        "object": {
            "val1": "thisValue",
            "val2": "otherValue"
        }
    },
    {
        "id": 2,
        "object": {
            "val1": "hehe",
            "val2": "test"
        }
    },
    {
        "id": 3,
        "object": {
            "val1": "yes",
            "val2": "no"
        }
    }
]

Reading in the that json file in a spark-shell (spark version 2.4.5) with the following command:

val df = spark.read.option("multiline", "true").json("test.json")

Gives me the following output:

scala> df.show(false)
 --- ----------------------- 
|id |object                 |
 --- ----------------------- 
|1  |[thisValue, otherValue]|
|2  |[hehe, test]           |
|3  |[yes, no]              |
 --- ----------------------- 


scala> df.printSchema
root
 |-- id: long (nullable = true)
 |-- object: struct (nullable = true)
 |    |-- val1: string (nullable = true)
 |    |-- val2: string (nullable = true)

This is just a tiny example to give you something functioning.

But do have a look at those [{ and }] rows in your corrupt dataframe!

Hope it helps :)

CodePudding user response:

I am afraid that this is not going to work, at least with current version of Spark.

I am not a Spark commiter but i did an investigation and here is what i found. I am not sure that this is 100% true but maybe it will be usefull for you (at lest as good starting point for further investigation)

I digged into Spark code and i found that there is a big difference between multiline and standrad file:

  • With multiline set to false Spark is using TextInputJsonDataSource to read this file, here you can see how the read operation looks in code Spark Source Code:

    override def readFile(
        conf: Configuration,
        file: PartitionedFile,
        parser: JacksonParser,
        schema: StructType): Iterator[InternalRow] = {
      val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
      val textParser = parser.options.encoding
        .map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
        .getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))
    
      val safeParser = new FailureSafeParser[Text](
        input => parser.parse(input, textParser, textToUTF8String),
        parser.options.parseMode,
        schema,
        parser.options.columnNameOfCorruptRecord)
      linesReader.flatMap(safeParser.parse)
    }
    

Here we can see that Spark is reading file line by line and then calling flatMap to process every line with parser so its later easy to find malformed record and generate _corrupt_record for them

When you set multiline option to true, Spark is going to use MultiLineJsonDataSource (spoiler - it was previously called WholeFileJsonDataSource). Here you can find function to read the data: Spark source code

  override def readFile(
      conf: Configuration,
      file: PartitionedFile,
      parser: JacksonParser,
      schema: StructType): Iterator[InternalRow] = {
    def partitionedFileString(ignored: Any): UTF8String = {
      Utils.tryWithResource {
        CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
      } { inputStream =>
        UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
      }
    }
    val streamParser = parser.options.encoding
      .map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
      .getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))

    val safeParser = new FailureSafeParser[InputStream](
      input => parser.parse[InputStream](input, streamParser, partitionedFileString),
      parser.options.parseMode,
      schema,
      parser.options.columnNameOfCorruptRecord)

    safeParser.parse(
      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
  }

Now lets take a look at JsonParser and its generic function parse: Spark source code

  def parse[T](
      record: T,
      createParser: (JsonFactory, T) => JsonParser,
      recordLiteral: T => UTF8String): Iterable[InternalRow] = {
    try {
      Utils.tryWithResource(createParser(factory, record)) { parser =>
        // a null first token is equivalent to testing for input.trim.isEmpty
        // but it works on any token stream and not just strings
        parser.nextToken() match {
          case null => None
          case _ => rootConverter.apply(parser) match {
            case null => throw QueryExecutionErrors.rootConverterReturnNullError()
            case rows => rows.toSeq
          }
        }
      }
    } catch {
      case e: SparkUpgradeException => throw e
      case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
        // JSON parser currently doesnt support partial results for corrupted records.
        // For such records, all fields other than the field configured by
        // `columnNameOfCorruptRecord` are set to `null`
        throw BadRecordException(() => recordLiteral(record), () => None, e)
      case e: CharConversionException if options.encoding.isEmpty =>
        val msg =
          """JSON parser cannot handle a character in its input.
            |Specifying encoding as an input option explicitly might help to resolve the issue.
            |""".stripMargin   e.getMessage
        val wrappedCharException = new CharConversionException(msg)
        wrappedCharException.initCause(e)
        throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
      case PartialResultException(row, cause) =>
        throw BadRecordException(
          record = () => recordLiteral(record),
          partialResult = () => Some(row),
          cause)
    }
  }

Here you can see that Json is not generating PartialResultException, but probably one from those two: JsonProcessingException | MalformedInputException

Due to that this code is throwing this exception: BadRecordException(() => recordLiteral(record), () => None, e) where record = our stream = whole file.

This exception is later interpreted by FailureSafeParser which is generating output rows for you, and is just coppying the data into _corrupt_record

In general i tried to found informations in commits and Jira but i think that this topic is a real mess. I found initial commit which added this functionality with this message:

[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.

"the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure" - looks like this changed later (atually you have literal Json in this column), but i think that general approach is the same.

So moving back to the questions: "I want to know if it is an intended behavior or just a bug!" - i think that this is not a bug nor intended behaviour but a consequence of how Jackson parser was initialy implemented and for this moment we have to live with this

  • Related