I need to transform a list of JSONs into pySpark DataFrames. The JSON all have the same Schema. The problem is that the value-entries of the dicts in the JSON have different data types.
Example: The field complex
is an Array of Dicts, the Dicts has four keys but of different types (Integer, String, Float and a nested Dict). See below for an example JSON.
If I use df = spark.createDataFrame(json_list)
to create my DataFrame from the jsons, pyspark "deletes" some of the data as he cannot infer the Schema correctly. PySpark decides
that the Schema of the complex
-field should be: StructType("complex", ArrayType(MapType(StringType(), LongType())))
which leads to the non-LongType values being nulled.
I tried to supply a schema, but since I need to set a specific (?) DataType for the value fields of the nested MapType - which is not uniform, but varies...
myschema = StructType([
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructType("complex", ArrayType(MapType(StringType(), StringType())))
])
The MapType(StringType(), StringType())))
means some value-fields in the dict are being nulled as it cannot be mapped.
It seems that PySpark can only handle dicts if all data types of the values are the same.
How can I convert the JSON to a pyspark DataFrame without loosing data?
[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]
CodePudding user response:
You can specify the schema of the complex
column as an array of struct.
myschema = StructType(
[
StructField("Id", StringType(), True),
StructField("name", StringType(), True),
StructField("sentTimestamp", LongType(), True),
StructField(
"complex",
ArrayType(StructType(
[
StructField("key1", LongType(), True),
StructField("key2", StringType(), True),
StructField("key3", StringType(), True),
StructField(
"key4",
StructType(
[
StructField("innerkey1", StringType(), True),
StructField("innerkey2", StringType(), True),
StructField("innerkey3", IntegerType(), True),
]
)
)
]
))
)
]
)
CodePudding user response:
If you do not want to pass a schema or want spark to detect schema from 3.0 you can write json into a table
%sql
CREATE TABLE newtable AS SELECT
'{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}'as original
Convert the table into a dataframe
df1 =spark.sql('select * from newtable')
rdd
the single column in the table
rdd=df1.select(col("original").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
Leverage .read
to read the rdd
schema and set is avariable
newschema=spark.read.json(rdd).schema
Assign schema to column using select
df3=df1.select("*",from_json("original", newschema).alias("transrequest"))
df3.select('transrequest.*').show(truncate=False)
------- ---------------------------------------------------------------- --------- -------------
|Id |complex |name |sentTimestamp|
------- ---------------------------------------------------------------- --------- -------------
|2345123|[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|something|1646732402 |
------- ---------------------------------------------------------------- --------- -------------
schema
root
|-- Id: string (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: long (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: double (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: double (nullable = true)
| | | |-- innerkey3: long (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
CodePudding user response:
Adding to @过过招 's answer, below is the approach I would personally use since it involves lesser code while defining the dataframe
schema.
Input JSON
jsonstr = """[{
"Id": "2345123",
"name": "something",
"sentTimestamp": 1646732402,
"complex":
[
{
"key1": 1,
"key2": "(1)",
"key3": 0.5,
"key4":
{
"innerkey1": "random",
"innerkey2": 5.4,
"innerkey3": 1
}
},
{
"key1": 2,
"key2": "(2)",
"key3": 0.5,
"key4":
{
"innerkey1": "left",
"innerkey2": 7.8,
"innerkey3": 1
}
}
]
}]"""
Converting this to a RDD
-
import json
rdd = sc.parallelize(json.loads(jsonstr))
Creating the dataframe
-
df=spark.createDataFrame(rdd, 'Id string, name string, sentTimestamp long, complex array<struct<key1:int, key2:string, key3:float, key4:struct<innerkey1:string,innerkey2:float,innerkey3:int>>>')
df.show(truncate=False)
#Output Data
------- --------- ------------- ----------------------------------------------------------------
|Id |name |sentTimestamp|complex |
------- --------- ------------- ----------------------------------------------------------------
|2345123|something|1646732402 |[{1, (1), 0.5, {random, 5.4, 1}}, {2, (2), 0.5, {left, 7.8, 1}}]|
------- --------- ------------- ----------------------------------------------------------------
#Output Schema
root
|-- Id: string (nullable = true)
|-- name: string (nullable = true)
|-- sentTimestamp: long (nullable = true)
|-- complex: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key2: string (nullable = true)
| | |-- key3: float (nullable = true)
| | |-- key4: struct (nullable = true)
| | | |-- innerkey1: string (nullable = true)
| | | |-- innerkey2: float (nullable = true)
| | | |-- innerkey3: integer (nullable = true)