Home > front end >  Create json column from multiple rows using pyspark
Create json column from multiple rows using pyspark

Time:03-12

I would like to have a column in a pyspark df in json format.

exmaple df:

id type value
1 a 11
1 b 12
2 c 21

expected outcome:

id json
1 {"a":"11","b":"12","c":""}
2 {"a":"","b":"","c":"21"}

I tried to use

df.groupBy(df.id) \
    .agg(collect_list(to_json(create_map(df.type, df.value))).alias('json')) \

but it returns a nested json like this {{"a":"11"},{"b":"12"}}

can someone help me with this, thank you!!

CodePudding user response:

exemple df:

df  = spark.createDataFrame(
      [
        ('1','a','11'),
        ('1','b','12'),
        ('2','c','21')
      ], ['id','type','value']
    )
from pyspark.sql import functions as F

df.groupBy("id")\
    .agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
    .withColumn('json', F.to_json('type_value'))\
    .show(truncate=False)

 --- ------------------ ------------------- 
|id |type_value        |json               |
 --- ------------------ ------------------- 
|1  |{a -> 11, b -> 12}|{"a":"11","b":"12"}|
|2  |{c -> 21}         |{"c":"21"}         |
 --- ------------------ ------------------- 

CodePudding user response:

There's an easier way to do this. See below logic -

Input_DF

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([StructField("id", StringType(), True), StructField("type", StringType(), True), StructField("value", StringType(), True)])

df  = spark.createDataFrame([('1','a','11'),('1','b','12'),('2','c','21')], schema)
df.show(truncate=False)

 --- ---- ----- 
| id|type|value|
 --- ---- ----- 
|  1|   a|   11|
|  1|   b|   12|
|  2|   c|   21|
 --- ---- ----- 

First, pivot the type column and aggregate it using its corresponding value as below -

df1 = df.groupBy("id").pivot("type").agg(first("value"))
df1.show()

 --- ---- ---- ---- 
| id|   a|   b|   c|
 --- ---- ---- ---- 
|  1|  11|  12|null|
|  2|null|null|  21|
 --- ---- ---- ---- 

Once you have this, you have to replace null values to its string equivalent. Having said that, spark ignores null values while creating json column from struct type. See below -

df1.select(*(df1.columns)).fillna("null").withColumn("json", regexp_replace(to_json(struct(col("a"), col("b"), col("c"))), "null", "")).drop("a", "b", "c").show(truncate=False)

 --- -------------------------- 
|id |json                      |
 --- -------------------------- 
|1  |{"a":"11","b":"12","c":""}|
|2  |{"a":"","b":"","c":"21"}  |
 --- -------------------------- 

CodePudding user response:

df  = spark.createDataFrame(
      [
        ('1','a','11'),
        ('1','b','12'),
        ('2','c','21')
      ], ['id','type','value']
    )

from pyspark.sql import functions as F
from pyspark.sql.types import *

def input_type(json):
    dic_json = eval(json)
    for x in ['a', 'b', 'c']:
       if x not in dic_json: dic_json[x] = ''
    return dic_json

input_type_udf = F.udf(input_type, StringType())

df.groupBy("id")\
    .agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
    .withColumn('json', F.to_json('type_value'))\
    .withColumn('dic_json', input_type_udf(F.col('json')))\
    .show(truncate=False)
 --- ------------------ ------------------- ---------------- 
|id |type_value        |json               |dic_json        |
 --- ------------------ ------------------- ---------------- 
|1  |{a -> 11, b -> 12}|{"a":"11","b":"12"}|{a=11, b=12, c=}|
|2  |{c -> 21}         |{"c":"21"}         |{a=, b=, c=21}  |
 --- ------------------ ------------------- ---------------- 
  • Related