I have a json file which looks like this
{
"tags":[
"Real_send",
"stopped"
],
"messages":{
"7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
"channel":"channel 1",
"name":"Version 1",
"alert":"\ud83d\ude84 alert 1"
},
"c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
"channel":"channel 1",
"name":"name 1",
"type":"type 1"
},
"b869886f-0f9c-487f-8a43-abe3d6456678":{
"channel":"channel 2",
"name":"Version 2",
"alert":"\ud83d\ude84 alert 2"
}
}
}
I want the output to look like below
When I print the schema I get the below schema from spark
StructType(List(
StructField(messages,
StructType(List(
StructField(7c2e9284-993d-4eb4-ad6b-6a2bfcc51060,
StructType(List(
StructField(alert,StringType,true),
StructField(channel,StringType,true),
StructField(name,StringType,true))),true),
StructField(b869886f-0f9c-487f-8a43-abe3d6456678,StructType(List(
StructField(alert,StringType,true),
StructField(channel,StringType,true),
StructField(name,StringType,true))),true),
StructField(c2cbd05c-5452-476c-bdc7-ac31ed3417f9,StructType(List(
StructField(channel,StringType,true),
StructField(name,StringType,true),
StructField(type,StringType,true))),true))),true),
StructField(tags,ArrayType(StringType,true),true)))
Basically 7c2e9284-993d-4eb4-ad6b-6a2bfcc51060
should be considered as my ID
column
My code looks like:
cols_list_to_select_from_flattened = ['alert', 'channel', 'type', 'name']
df = df \
.select(
F.json_tuple(
F.col('messages'), *cols_list_to_select_from_flattened
)
.alias(*cols_list_to_select_from_flattened))
df.show(1, False)
Error message:
E pyspark.sql.utils.AnalysisException: cannot resolve 'json_tuple(`messages`, 'alert', 'channel', 'type', 'name')' due to data type mismatch: json_tuple requires that all arguments are strings;
E 'Project [json_tuple(messages#0, alert, channel, type, name) AS ArrayBuffer(alert, channel, type, name)]
E - Relation[messages#0,tags#1] json
I also tried to list all keys like below
df.withColumn("map_json_column", F.posexplode_outer(F.col("messages"))).show()
But got error
E pyspark.sql.utils.AnalysisException: cannot resolve 'posexplode(`messages`)' due to data type mismatch: input to function explode should be array or map type, not struct<7c2e9284-993d-4eb4-ad6b-6a2bfcc51060:struct<alert:string,channel:string,name:string>,b869886f-0f9c-487f-8a43-abe3d6456678:struct<alert:string,channel:string,name:string>,c2cbd05c-5452-476c-bdc7-ac31ed3417f9:struct<channel:string,name:string,type:string>>;
E 'Project [messages#0, tags#1, generatorouter(posexplode(messages#0)) AS map_json_column#5]
E - Relation[messages#0,tags#1] json
How can I get the desired output?
CodePudding user response:
When reading json you can specify your own schema, instead of message column being a struct type make it a map type and then you can simply explode that column
Here is a self contained example with your data
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
json_sample = """
{
"tags":[
"Real_send",
"stopped"
],
"messages":{
"7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
"channel":"channel 1",
"name":"Version 1",
"alert":"lert 1"
},
"c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
"channel":"channel 1",
"name":"name 1",
"type":"type 1"
},
"b869886f-0f9c-487f-8a43-abe3d6456678":{
"channel":"channel 2",
"name":"Version 2",
"alert":" alert 2"
}
}
}
"""
data = spark.sparkContext.parallelize([json_sample])
cols_to_select = ['alert', 'channel', 'type', 'name']
# The schema of message entry, only columns
# that are needed to select will be parsed,
# must be nullable based on your data sample
message_schema = StructType([
StructField(col_name, StringType(), True) for col_name in cols_to_select
])
# the complete document schema
json_schema = StructType([
StructField("tags", StringType(), False),
StructField("messages", MapType(StringType(), message_schema, False) ,False),
])
# Read json and parse to specific schema
# Here instead of sample data you can use file path
df = spark.read.schema(json_schema).json(data)
# explode the map column and select the requires columns
df = (
df
.select(F.explode(F.col("messages")))
.select(
F.col("key").alias("id"),
*[F.col(f"value.{col_name}").alias(col_name) for col_name in cols_to_select]
)
)
df.show(truncate=False)