Home > Blockchain >  convert column of dictionaries to columns in pyspark dataframe
convert column of dictionaries to columns in pyspark dataframe

Time:10-06

I have the pyspark dataframe df below. It has the schema shown below. I've also supplied some sample data, and the desired out put I'm looking for. The problem I'm having is the attributes column has values that are dictionaries. I would like to create new columns for each key in the dictionaries but the values in the attribute column are string. So I'm having trouble using explode or from_json.

I made an attempt based on another SO post using explode, the code I ran and the error are below the example data and desired output.

Also I don't know what all the keys in the dict might be, since different records have different length dicts.

does anyone have a suggestion how to do this? I was thinking of converting it to pandas and trying to solve it that way, but I'm hoping there's a better/faster pyspark solution.

df.schema

StructType(List(StructField(id,IntegerType,true),StructField(attributes,StringType,true)))




df.show()

 ---------------------------------------------------------------------------- 
|id     |attributes                                                          |                                                                                                   ----------------------------------------------------------------------------                                                                                                                                                                     
|1341205|{"general": ["P1"]}                                                 |
 ----------------------------------------------------------------------------                                                                                                                                                      
|553654 |{"flavor": ["CITRUS"], "general": ["SOIL"]}                         |
 ----------------------------------------------------------------------------  
|573786 |{"flavor": ["BERRY", "SWEET"], "general": ["CLEAN GREEN CERTIFIED"]}|
 ----------------------------------------------------------------------------                                                                                                                                                                                          
|2621805|{"flavor": ["SWEET"]}                                               |                                                                                                                                                                                    ----------------------------------------------------------------------------  

desired output:

 ---------------------------------------------------- 
|id     |general                  |flavor            |
 ---------------------------------------------------- 
|1341205|["P1"]                   |                  |
 ---------------------------------------------------- 
|553654 |["SOIL"]                 |["CITRUS"]        |
 ---------------------------------------------------- 
|573786 |["CLEAN GREEN CERTIFIED"]|["BERRY", "SWEET"]|
 ---------------------------------------------------- 
|2621805|                         |["SWEET"]         |
 ---------------------------------------------------- 

failed attempt:

from pyspark.sql.functions import udf, explode
import json

@udf("map<string, bigint>")
def parse(s):
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        pass 


df.select('id',explode(parse('attributes'))).show(truncate=False)

error:

An error was encountered:
An error occurred while calling o467.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 21, ip-10-100-190-45.us-west-2.compute.internal, executor 46): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt3/yarn/usercache/livy/appcache/application_1633358712221_0006/container_1633358712221_0006_01_000051/pyspark.zip/pyspark/worker.py", line 377, in main
    process()

....

CodePudding user response:

Try using from_json function with the corresponding schema to parse the json string

import pyspark.sql.functions as F

json_schema = "general array<string>, flavor array<string>"

df = df.withColumn("x", F.from_json("attributes", json_schema)).select("id", "x.*")
df.show(truncate=False)

#  ------- ----------------------- -------------- 
# |id     |general                |flavor        |
#  ------- ----------------------- -------------- 
# |1341205|[P1]                   |null          |
# |553654 |[SOIL]                 |[CITRUS]      |
# |573786 |[CLEAN GREEN CERTIFIED]|[BERRY, SWEET]|
# |2621805|null                   |[SWEET]       |
#  ------- ----------------------- -------------- 
  • Related