Home > database >  from_json converting all values to null
from_json converting all values to null

Time:12-26

I am reading a string of multiple JSONs and converting to multiple columns in PySpark dataframe. The JSON elements ma or may not have null values. My code works fine when all elements in the JSON are non-null. However if single element is null, it makes all the elements null. Here's an example,

Input: Note that addresses looks like an array but it is actually a string.

id='1'
addresses='
[{
    "city": "city1",
    "state": null,
    "street": null,
    "postalCode": null,
    "country": "country1"
}
,
{
    "city": "city2",
    "state": null,
    "street": "street2",
    "postalCode": "11111",
    "country": "country2"
}]'

Expected output:

id  city    state   street  postalCode  country
1   city1   null    null    null    country1
1   city2   null    street2 11111   country2

My code:

addl_addr_schema = ArrayType(StructType([
    StructField("addl_addr_city", StringType(), True),
    StructField("addl_addr_state", StringType(), True),
    StructField("addl_addr_street", StringType(), True),
    StructField("addl_addr_postalCode", StringType(), True),
    StructField("addl_addr_country", StringType(), True),
]))


dpDF_transformed = dpDF_temp.withColumn('addresses_transformed', from_json('addresses', addl_addr_schema)) \
                                    .withColumn('addl_addr', explode_outer('addresses_transformed'))

dpDF_transformed = dpDF_transformed.select("*",col("addresses_transformed.addl_addr_street").alias("addl_addr_street_array"),col("addresses_transformed.addl_addr_city").alias("addl_addr_city_array"),col("addresses_transformed.addl_addr_state").alias("addl_addr_state_array"),col("addresses_transformed.addl_addr_postalCode").alias("addl_addr_postalCode_array"),col("addresses_transformed.addl_addr_country").alias("addl_addr_country_array"))

dpDF_final = dpDF_transformed.withColumn("addl_addr_street",concat_ws(",","addl_addr_street_array")) \
                                     .withColumn("addl_addr_city",concat_ws(",","addl_addr_city_array")) \                .withColumn("addl_addr_state",concat_ws(",","addl_addr_state_array")) \
.withColumn("addl_addr_postalCode",concat_ws(",","addl_addr_postalCode_array")) \                                     .withColumn("addl_addr_country",concat_ws(",","addl_addr_country_array")) \                                    .drop("addresses","addresses_transformed","addl_addr","addl_addr_street_array","addl_addr_city_array","addl_addr_state_array","addl_addr_postalCode_array","addl_addr_country_array")

Output I am getting

id  city    state   street  postalCode  country
1   city1   null    null    null    null
1   city2   null    null    null    null

I believe what is happening is, from_json is seeing a type mismatch. I have defined every element as StringType() but some elements are actually NullType. How do I deal with this? The attributes may or may not be null. I thought making nullable = True while defining the schema would help but it doesn't seem to.

CodePudding user response:

You're on the right track. You've identified that you need a schema, explode the array, and extract the columns.

Something seemed to go wrong in your from_json function. I expect that the fact that your schema (with the addl_addr_state field) and your data (with the state field) did not have the same column names, making Spark think that there is no useful data in there. You need to make sure that the fields in your schema and your data have the same names.

You can do all of this in a simpler way, using some neat tricks. The following code will get you where you want:

from pyspark.sql.types import ArrayType, StringType, StructType, StructField
from pyspark.sql.functions import from_json, explode

id='1'
addresses="""
[{
    "city": "city1",
    "state": null,
    "street": null,
    "postalCode": null,
    "country": "country1"
}
,
{
    "city": "city2",
    "state": null,
    "street": "street2",
    "postalCode": "11111",
    "country": "country2"
}]"""


schema = ArrayType(StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("street", StringType(), True),
    StructField("postalCode", StringType(), True),
    StructField("country", StringType(), True),
]))

# Reading in the dataframe with the raw json string in the addresses column
df = spark.createDataFrame([(id, addresses)], ["id", "addresses"])

# Parsing in our json and exploding to have a single line per city
parsed_df = df.withColumn("addresses", explode(from_json("addresses", schema)))

parsed_df.show(truncate=False)                                                                                                                                                                                                                                              
 --- ----------------------------------                                                                                                                                                                                                                                         
|id |addresses                         |                                                                                                                                                                                                                                        
 --- ----------------------------------                                                                                                                                                                                                                                         
|1  |[city1,,,, country1]              |                                                                                                                                                                                                                                        
|1  |[city2,, street2, 11111, country2]|                                                                                                                                                                                                                                        
 --- ---------------------------------- 

# Unwrapping the addresses column with the "struct.*" notation
unwrapped_df = parsed_df.select("id", "addresses.*")

unwrapped_df.show(truncate=False)                                                                                                                                                                                                                                           
 --- ----- ----- ------- ---------- --------                                                                                                                                                                                                                                    
|id |city |state|street |postalCode|country |                                                                                                                                                                                                                                   
 --- ----- ----- ------- ---------- --------                                                                                                                                                                                                                                    
|1  |city1|null |null   |null      |country1|                                                                                                                                                                                                                                   
|1  |city2|null |street2|11111     |country2|                                                                                                                                                                                                                                   
 --- ----- ----- ------- ---------- -------- 

So as you see, properly reading in the data and then some slight manipulations (from_json, explode, select("struct.*")) give you a quite easy way to work around your problem.

Hope this helps!

  • Related