Home > Enterprise >  Mixed schema datatype JSON to PySpark DataFrame
Mixed schema datatype JSON to PySpark DataFrame

Time:03-29

I need to transform a list of JSONs into pySpark DataFrames. The JSON all have the same Schema. The problem is that the value-entries of the dicts in the JSON have different data types.

Example: The field complex is an Array of Dicts, the Dicts has four keys but of different types (Integer, String, Float and a nested Dict). See below for an example JSON.

If I use df = spark.createDataFrame(json_list) to create my DataFrame from the jsons, pyspark "deletes" some of the data as he cannot infer the Schema correctly. PySpark decides that the Schema of the complex-field should be: StructType("complex", ArrayType(MapType(StringType(), LongType()))) which leads to the non-LongType values being nulled.

I tried to supply a schema, but since I need to set a specific (?) DataType for the value fields of the nested MapType - which is not uniform, but varies...

myschema = StructType([
                             StructField("Id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("sentTimestamp", LongType(), True),
                             StructType("complex", ArrayType(MapType(StringType(), StringType())))
                             ])

The MapType(StringType(), StringType()))) means some value-fields in the dict are being nulled as it cannot be mapped.

It seems that PySpark can only handle dicts if all data types of the values are the same.

How can I convert the JSON to a pyspark DataFrame without loosing data?

[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]

CodePudding user response:

You can specify the schema of the complex column as an array of struct.

myschema = StructType(
    [
        StructField("Id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("sentTimestamp", LongType(), True),
        StructField(
            "complex",
            ArrayType(StructType(
                [
                    StructField("key1", LongType(), True),
                    StructField("key2", StringType(), True),
                    StructField("key3", StringType(), True),
                    StructField(
                        "key4",
                        StructType(
                            [
                                StructField("innerkey1", StringType(), True),
                                StructField("innerkey2", StringType(), True),
                                StructField("innerkey3", IntegerType(), True),
                            ]
                        )
                    )
                ]
            ))
        )
    ]
)

CodePudding user response:

If you do not want to pass a schema or want spark to detect schema from 3.0 you can write json into a table

%sql

CREATE TABLE newtable AS SELECT
'{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}'as original

Convert the table into a dataframe

df1 =spark.sql('select * from newtable')

rdd the single column in the table

rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)

Leverage .read to read the rdd schema and set is avariable

newschema=spark.read.json(rdd).schema

Assign schema to column using select

df3=df1.select("*",from_json("original", newschema).alias("transrequest"))

df3.select('transrequest.*').show(truncate=False)

 ------- ---------------------------------------------------------------- --------- ------------- 
|Id     |complex                                                         |name     |sentTimestamp|
 ------- ---------------------------------------------------------------- --------- ------------- 
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402   |
 ------- ---------------------------------------------------------------- --------- ------------- 

schema

root
 |-- Id: string (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: long (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: double (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: double (nullable = true)
 |    |    |    |-- innerkey3: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)

CodePudding user response:

Adding to @过过招 's answer, below is the approach I would personally use since it involves lesser code while defining the dataframe schema.

Input JSON

jsonstr = """[{
    "Id": "2345123",
    "name": "something",        
    "sentTimestamp": 1646732402,
    "complex":
    [
        {
            "key1": 1,
            "key2": "(1)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "random",
                "innerkey2": 5.4,
                "innerkey3": 1
            }
        },
        {
            "key1": 2,
            "key2": "(2)",
            "key3": 0.5,
            "key4":
            {
                "innerkey1": "left",
                "innerkey2": 7.8,
                "innerkey3": 1
            }
        }
    ]
}]"""

Converting this to a RDD -

import json

rdd = sc.parallelize(json.loads(jsonstr))

Creating the dataframe -

df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)

#Output Data
 ------- --------- ------------- ---------------------------------------------------------------- 
|Id     |name     |sentTimestamp|complex                                                         |
 ------- --------- ------------- ---------------------------------------------------------------- 
|2345123|something|1646732402   |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
 ------- --------- ------------- ---------------------------------------------------------------- 

#Output Schema
root
 |-- Id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sentTimestamp: long (nullable = true)
 |-- complex: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key3: float (nullable = true)
 |    |    |-- key4: struct (nullable = true)
 |    |    |    |-- innerkey1: string (nullable = true)
 |    |    |    |-- innerkey2: float (nullable = true)
 |    |    |    |-- innerkey3: integer (nullable = true)

  • Related