Home > Blockchain >  Issues while loading JSON in Azure Notebooks
Issues while loading JSON in Azure Notebooks

Time:10-26

Recently I've been trying to make some transformations in some Json files in the Azure Synapse notebooks using Scala language and loading them in using the spark.read function. The problem is the following one:

  • 1st case: I load it using an schema (via Structype) and the returned DF is all null
  • 2nd case: I load it withouth the schema and it returns "_corrupt_record" (this happens using multiline = true, too)

I do not know what is happening as I have tried to load different types of Jsons and none of them work (they are normal jsons downloaded from Kaggle, though).

{
  "results": [{
    "columns": [{
      "name": "COD",
      "type": "NUMBER"
    }, {
      "name": "TECH",
      "type": "NUMBER"
    }, {
      "name": "OBJECT",
      "type": "NUMBER"
    }],
    "items": [{
      "cod": 3699,
      "tech": "-",
      "object": "Type 2"
    }, {
      "cod": 3700,
      "tech": 56,
      "object": "Type 1"
    }, {
      "cod": 3701,
      "tech": 20,
      "object": "No type"
    }]
  }]
}

CodePudding user response:

I am getting similar kind of corrupt data error when I tried to reproduce

enter image description here

As you shared sample data it contains multiple lines to single object and the Json has multiple objects inside it to view this Json file, I used the multiline option as true and the exploded each column and selected it.

//reading JSON file from ADLS in json format
val  read_path = "abfss://[email protected]/sample.json"
val  customers_data_path = spark.read.format("json").option("inferSchema","true").option("multiline","true").load(read_path)
customers_data_path.show();

enter image description here

//Exploding the results column into its object as column
val  DF1=customers_data_path.select(explode(col("results")).as("results"))
DF1.show()

//Selecting all columns from results
val  DF2 = DF1.select(col("results.*"))
DF2.show();

enter image description here

//further exploding Columns column and items objects
val  DF3 = DF2.select(explode(col("columns")).as("columns"),col("items"))
val  DF4 = DF3.select(col("columns"),explode(col("items")).as("items"))
DF4.show();

enter image description here

  1. In this approach you will each item object has all columns object value
//selecting All columns inside columns and items object
val  DF5 = DF4.select(col("columns.*"),col("items.*"))
DF5.show();

enter image description here

  1. In this approach you will get null when object dont have value in particular colum.
//exploding Columns column and items objects
val  DF5 = DF2.select(explode(col("columns")).as("columns"))
DF5.show();

val  DF6 = DF2.select(explode(col("items")).as("items"))
DF6.show();

enter image description here

//selecting All columns inside columns and items object
val  DF7 = DF5.select(col("columns.*"))
val  DF8 = DF6.select(col("items.*"))
DF7.show();
DF8.show();

enter image description here

//combining both the above dataframes 
val  DF10 = DF7.join(DF8, lit(false), "full").show()

enter image description here

  1. In this approach you will get combined data frame for columns
//creating sequential index column with help of that join the data frames and then drop it.

import  org.apache.spark.sql.types.{StructType, StructField, StringType, LongType};
import  spark.implicits._
import  org.apache.spark.sql.Row

val  df11 = spark.sqlContext.createDataFrame(
DF7.rdd.zipWithIndex.map {
case (row, index) =>  Row.fromSeq(row.toSeq :  index)
},
// Create schema for index column
StructType(DF7.schema.fields :  StructField("index", LongType, false))
)

val  df22 = spark.sqlContext.createDataFrame(
DF8.rdd.zipWithIndex.map {
case (row, index) =>  Row.fromSeq(row.toSeq :  index)
},
// Create schema for index column
StructType(DF8.schema.fields :  StructField("index", LongType, false))
)

val  DF12 = df11.join(df22, Seq("index")).drop("index")

DF12.show()

enter image description here

enter image description here

  • Related