Home > Mobile >  Is there a way to guess the schema dynamically in Pyspark?
Is there a way to guess the schema dynamically in Pyspark?

Time:10-04

I have a table in Databricks, with a column as string dictionary as below -

 --- -------------------------------------------------------------------------------------------------------------- 
|id |stringDictionary                                                                                              |
 --- -------------------------------------------------------------------------------------------------------------- 
|abc|{"col1": "someValue", "col2" : "someValue", "col3" : "someValue"}                                             |
|def|{"col1" : "someValue", "col3": "someValue"}                                                                   |
|mnp|{"col1" : "someValue", "col2" : "someValue", "col3" : "someValue", "col4" : "someValue", "col5" : "someValue"}|
|abc|{"col4" : "someValue", "col5" : "someValue", "col6" : "someValue"}                                            |
 --- -------------------------------------------------------------------------------------------------------------- 

Now for each of the id's there can be multiple structures as shown.

I tried defining the Struct schema for few of them as below -

from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import col,from_json

schema = StructType([ 
    StructField("col1",StringType(),True), 
    StructField("col2",StringType(),True), 
    StructField("col3",StringType(),True), 
    StructField("col4", StringType(), True)
  ])

dfJSON = sparkDF.withColumn("jsonData",from_json(col("stringDictionary"),schema)) \
                   .select("stringDictionary","jsonData.*","*")#.drop("stringDictionary")

display(dfJSON)

But this is not a good approach reason being every time a new element is added it won't suffice and will have to change the schema manually.

Is there a way to handle all such scenarios or guess the struct schema whenever this table or dataframe is read and flatten its corresponding stringDictionary to be a separate column of its own?

Please help.

CodePudding user response:

Use schema_of_json() to dynamically make your schema, then use MergeSchema for schema evolution.

Be aware that in production environment, sometimes the json payload can be sent with wrong data type. Also, schema evolution doesn't work with nested structs.

CodePudding user response:

Convert sparkDF's stringDictionary column to MapType using the below code -

from pyspark.sql.functions import *
from pyspark.sql.types import *
df = (sparkDF.withColumn("cols", 
                          from_json( col("stringDictionary"), 
                                     MapType(StringType(), StringType())
                                    )
                         )
             .drop("stringDictionary")
       )

Now, column cols needs to be exploded as below -

df2 = df.select("id", explode("cols").alias("col_columns", "col_rows"))

display(df2)

Once, you have col_columns and col_rows as individual columns. All we need to do is pivot col_columns and aggregate it using its corresponding col_rows as below -

df3 = df2.groupBy("id")
         .pivot("col_columns")
         .agg(first("col_rows"))

display(df3)

  • Related