I would like to have a column in a pyspark df in json format.
exmaple df:
id | type | value |
---|---|---|
1 | a | 11 |
1 | b | 12 |
2 | c | 21 |
expected outcome:
id | json |
---|---|
1 | {"a":"11","b":"12","c":""} |
2 | {"a":"","b":"","c":"21"} |
I tried to use
df.groupBy(df.id) \
.agg(collect_list(to_json(create_map(df.type, df.value))).alias('json')) \
but it returns a nested json like this {{"a":"11"},{"b":"12"}}
can someone help me with this, thank you!!
CodePudding user response:
exemple df:
df = spark.createDataFrame(
[
('1','a','11'),
('1','b','12'),
('2','c','21')
], ['id','type','value']
)
from pyspark.sql import functions as F
df.groupBy("id")\
.agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
.withColumn('json', F.to_json('type_value'))\
.show(truncate=False)
--- ------------------ -------------------
|id |type_value |json |
--- ------------------ -------------------
|1 |{a -> 11, b -> 12}|{"a":"11","b":"12"}|
|2 |{c -> 21} |{"c":"21"} |
--- ------------------ -------------------
CodePudding user response:
There's an easier way to do this. See below logic -
Input_DF
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([StructField("id", StringType(), True), StructField("type", StringType(), True), StructField("value", StringType(), True)])
df = spark.createDataFrame([('1','a','11'),('1','b','12'),('2','c','21')], schema)
df.show(truncate=False)
--- ---- -----
| id|type|value|
--- ---- -----
| 1| a| 11|
| 1| b| 12|
| 2| c| 21|
--- ---- -----
First, pivot the type
column and aggregate it using its corresponding value
as below -
df1 = df.groupBy("id").pivot("type").agg(first("value"))
df1.show()
--- ---- ---- ----
| id| a| b| c|
--- ---- ---- ----
| 1| 11| 12|null|
| 2|null|null| 21|
--- ---- ---- ----
Once you have this, you have to replace null
values to its string
equivalent. Having said that, spark ignores null
values while creating json
column from struct
type. See below -
df1.select(*(df1.columns)).fillna("null").withColumn("json", regexp_replace(to_json(struct(col("a"), col("b"), col("c"))), "null", "")).drop("a", "b", "c").show(truncate=False)
--- --------------------------
|id |json |
--- --------------------------
|1 |{"a":"11","b":"12","c":""}|
|2 |{"a":"","b":"","c":"21"} |
--- --------------------------
CodePudding user response:
df = spark.createDataFrame(
[
('1','a','11'),
('1','b','12'),
('2','c','21')
], ['id','type','value']
)
from pyspark.sql import functions as F
from pyspark.sql.types import *
def input_type(json):
dic_json = eval(json)
for x in ['a', 'b', 'c']:
if x not in dic_json: dic_json[x] = ''
return dic_json
input_type_udf = F.udf(input_type, StringType())
df.groupBy("id")\
.agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
.withColumn('json', F.to_json('type_value'))\
.withColumn('dic_json', input_type_udf(F.col('json')))\
.show(truncate=False)
--- ------------------ ------------------- ----------------
|id |type_value |json |dic_json |
--- ------------------ ------------------- ----------------
|1 |{a -> 11, b -> 12}|{"a":"11","b":"12"}|{a=11, b=12, c=}|
|2 |{c -> 21} |{"c":"21"} |{a=, b=, c=21} |
--- ------------------ ------------------- ----------------