Home > other >  Exploding struct column values in pyspark
Exploding struct column values in pyspark

Time:01-27

I have a json file which looks like this

{
   "tags":[
      "Real_send",
      "stopped"
   ],
   "messages":{
      "7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
         "channel":"channel 1",
         "name":"Version 1",
         "alert":"\ud83d\ude84 alert 1"
      },
      "c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
         "channel":"channel 1",
         "name":"name 1",
         "type":"type 1"
      },
      "b869886f-0f9c-487f-8a43-abe3d6456678":{
         "channel":"channel 2",
         "name":"Version 2",
         "alert":"\ud83d\ude84 alert 2"
      }
   }
}

I want the output to look like below

Sample output

When I print the schema I get the below schema from spark

StructType(List(
StructField(messages,
StructType(List(
   StructField(7c2e9284-993d-4eb4-ad6b-6a2bfcc51060,
   StructType(List(
      StructField(alert,StringType,true),
      StructField(channel,StringType,true),
      StructField(name,StringType,true))),true),
   StructField(b869886f-0f9c-487f-8a43-abe3d6456678,StructType(List(
      StructField(alert,StringType,true),
      StructField(channel,StringType,true),
      StructField(name,StringType,true))),true),
   StructField(c2cbd05c-5452-476c-bdc7-ac31ed3417f9,StructType(List(
      StructField(channel,StringType,true),
      StructField(name,StringType,true),
      StructField(type,StringType,true))),true))),true),
StructField(tags,ArrayType(StringType,true),true)))

Basically 7c2e9284-993d-4eb4-ad6b-6a2bfcc51060 should be considered as my ID column

My code looks like:

cols_list_to_select_from_flattened = ['alert', 'channel', 'type', 'name']    
df = df \
        .select(
                F.json_tuple(
                    F.col('messages'), *cols_list_to_select_from_flattened
                )
                .alias(*cols_list_to_select_from_flattened))

df.show(1, False)

Error message:

E               pyspark.sql.utils.AnalysisException: cannot resolve 'json_tuple(`messages`, 'alert', 'channel', 'type', 'name')' due to data type mismatch: json_tuple requires that all arguments are strings;
E               'Project [json_tuple(messages#0, alert, channel, type, name) AS ArrayBuffer(alert, channel, type, name)]
E                - Relation[messages#0,tags#1] json

I also tried to list all keys like below

df.withColumn("map_json_column", F.posexplode_outer(F.col("messages"))).show()

But got error

  E               pyspark.sql.utils.AnalysisException: cannot resolve 'posexplode(`messages`)' due to data type mismatch: input to function explode should be array or map type, not struct<7c2e9284-993d-4eb4-ad6b-6a2bfcc51060:struct<alert:string,channel:string,name:string>,b869886f-0f9c-487f-8a43-abe3d6456678:struct<alert:string,channel:string,name:string>,c2cbd05c-5452-476c-bdc7-ac31ed3417f9:struct<channel:string,name:string,type:string>>;
E               'Project [messages#0, tags#1, generatorouter(posexplode(messages#0)) AS map_json_column#5]
E                - Relation[messages#0,tags#1] json

How can I get the desired output?

CodePudding user response:

When reading json you can specify your own schema, instead of message column being a struct type make it a map type and then you can simply explode that column

Here is a self contained example with your data

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

json_sample = """
{
   "tags":[
      "Real_send",
      "stopped"
   ],
   "messages":{
      "7c2e9284-993d-4eb4-ad6b-6a2bfcc51060":{
         "channel":"channel 1",
         "name":"Version 1",
         "alert":"lert 1"
      },
      "c2cbd05c-5452-476c-bdc7-ac31ed3417f9":{
         "channel":"channel 1",
         "name":"name 1",
         "type":"type 1"
      },
      "b869886f-0f9c-487f-8a43-abe3d6456678":{
         "channel":"channel 2",
         "name":"Version 2",
         "alert":" alert 2"
      }
   }
}
"""

data = spark.sparkContext.parallelize([json_sample])

cols_to_select = ['alert', 'channel', 'type', 'name']

# The schema of message entry, only columns 
# that are needed to select will be parsed, 
# must be nullable based on your data sample
message_schema = StructType([
    StructField(col_name, StringType(), True) for col_name in cols_to_select
])

# the complete document schema
json_schema = StructType([
    StructField("tags", StringType(), False),
    StructField("messages", MapType(StringType(), message_schema, False) ,False),
])

# Read json and parse to specific schema
# Here instead of sample data you can use file path 
df = spark.read.schema(json_schema).json(data)

# explode the map column and select the requires columns
df = (
    df
        .select(F.explode(F.col("messages")))
        .select(
            F.col("key").alias("id"),
            *[F.col(f"value.{col_name}").alias(col_name) for col_name in cols_to_select]
        )
)

df.show(truncate=False)
  • Related