I have a table in BigQuery that I am reading from using spark java. However when I try and access a columns nested JSON values I am unable to using from_json.
The column 'sender' has this structure:
{"UserInfo":{"CorporateEmailAddress":"[email protected]","UUID":32341983,"FirstName":"John","FirmNumber":678,"PersonalEmailAddress":"[email protected]","LastName":"Doe","AccountName":"AccountName","AccountNumber":12345}}
And I create the Nested struct types like below:
StructType userInfo = new StructType();
userInfo.add("CorporateEmailAddress", DataTypes.StringType, false);
userInfo.add("UUID", DataTypes.IntegerType, false);
userInfo.add("FirstName", DataTypes.StringType, false);
userInfo.add("FirmNumber", DataTypes.IntegerType, false);
userInfo.add("PersonalEmailAddress", DataTypes.StringType, false);
userInfo.add("LastName", DataTypes.StringType, false);
userInfo.add("AccountName", DataTypes.StringType, false);
userInfo.add("AccountNumber", DataTypes.IntegerType, false);
StructType schema = new StructType();
schema = schema.add("UserInfo", userInfo, false);
Then using spark I use withColumn and from_json using my schema to create a column in which I can access the nested fields in the JSON string.
SparkSession spark = SparkSession.builder()
.appName("spark-bigquery-pipeline")
.getOrCreate();
Dataset<Row> df = spark.read().format("bigquery")
.option("table", "table-email-data").load();
Dataset<Row> jsonColumnDataset = df.withColumn("jsonCol",functions.from_json(df.col("sender"), schema));
jsonColumnDataset.printSchema();
jsonColumnDataset.select("jsonCol").show();
However on running this code my output is:
-------
|jsonCol|
-------
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
| {{}}|
So it looks like I am not correctly parsing the JSON and creating a blank column?
EDIT: And here is the output of the jsonColumnDataset.printSchema();
root
|-- message_ID: string (nullable = false)
|-- msg_time: string (nullable = false)
|-- msg_time_UTC: string (nullable = false)
|-- msg_lang: string (nullable = false)
|-- subject: string (nullable = false)
|-- msg_body: string (nullable = false)
|-- disc_ref: string (nullable = true)
|-- greeting: string (nullable = true)
|-- sender: string (nullable = false)
|-- recipient: string (nullable = false)
|-- attachment: string (nullable = true)
|-- jsonCol: struct (nullable = true)
| |-- UserInfo: struct (nullable = true)
Can anyone see what might be wrong with my approach?
CodePudding user response:
Issue probably lies in your created schema structure.
You didn't create all json fields as StructField
.
Created schema
StructType userInfo = new StructType(new StructField[] {
new StructField("CorporateEmailAddress", DataTypes.StringType, false, Metadata.empty()),
new StructField("UUID", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("FirstName", DataTypes.StringType, false, Metadata.empty()),
new StructField("FirmNumber", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("PersonalEmailAddress", DataTypes.StringType, false, Metadata.empty()),
new StructField("LastName", DataTypes.StringType, false, Metadata.empty()),
new StructField("AccountName", DataTypes.StringType, false, Metadata.empty()),
new StructField("AccountNumber", DataTypes.IntegerType, false, Metadata.empty())
}
);
StructType schema = new StructType();
schema = schema.add(new StructField("UserInfo", userInfo, false, Metadata.empty()));
Printed the data
Dataset<Row> jsonColumnDataset = customerDF.withColumn("jsonCol", functions.from_json(customerDF.col("sender"), schema));
jsonColumnDataset.select("jsonCol").show(false);
Output
---------------------------------------------------------------------------
|jsonCol |
---------------------------------------------------------------------------
|[[[email protected],32341983,John,678,[email protected],Doe,AccountName,12345]]|
---------------------------------------------------------------------------
Print new Schema
jsonColumnDataset.printSchema();
root
|-- sender: string (nullable = true)
|-- jsonCol: struct (nullable = true)
| |-- UserInfo: struct (nullable = false)
| | |-- CorporateEmailAddress: string (nullable = false)
| | |-- UUID: integer (nullable = false)
| | |-- FirstName: string (nullable = false)
| | |-- FirmNumber: integer (nullable = false)
| | |-- PersonalEmailAddress: string (nullable = false)
| | |-- LastName: string (nullable = false)
| | |-- AccountName: string (nullable = false)
| | |-- AccountNumber: integer (nullable = false)
Access Individual Fields from newly added json column
jsonColumnDataset.select("jsonCol.UserInfo.CorporateEmailAddress").show(false);
jsonColumnDataset.select("jsonCol.UserInfo.UUID").show(false);
Output
|CorporateEmailAddress|
---------------------
|[email protected] |
---------------------
--------
|UUID |
--------
|32341983|
--------