Home > Software design >  Converting a list of dictionaries to json in Pyspark
Converting a list of dictionaries to json in Pyspark

Time:10-06

I have dataframe,i get column deskr as a list of dictionaries:

df = spark.createDataFrame(
    [
        (2022,'A1', "cat", 'eng', 3, 56.768639), 
         (2022,'A1', "rabbit", 'eng', 10, 56.768639), 
         (2022, 'A2', "dog", 'eng', 10, 54.114841),
           (2022, 'A2', "mouse", 'eng', 20, 81.114841),
    ],
    ["data",'group', "word", 'lang', 'count', 'value']  # add your column names here
)
df2 = df\
    .groupBy('data', 'group', 'lang')\
    .agg(F.collect_list(F.to_json(F.struct(F.col('count'), F.col('value'), F.col('word')))).alias('descr'))

I want to convert list of dict to json string with pandas_udf:

@pandas_udf(StringType())
def jsn(lst):
    return lst.apply(lambda lst: base64.b64encode(gzip.compress(json.dumps(lst).encode('utf-8'))).decode("utf-8"))

df3= df2.withColumn('descr2',
                        jsn(F.col('descr')))

But I got error:

TypeError: Object of type ndarray is not JSON serializable

CodePudding user response:

You are passing an array of json objects not a JSON string. Try with this instead:

.agg(
    F.to_json(
        F.collect_list(F.struct(F.col('count'), F.col('value'), F.col('word')))
    ).alias('descr')
)

You need to apply to_json function on the collected list of structs.

  • Related