I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:
C1 | C2 | CN |
---|---|---|
10 | 20 | abc |
99 | cde | |
40 | 50 |
Should be transformed to this:
C1 | C2 | CN | JSON |
---|---|---|---|
10 | 20 | abc | { "C1": 10, "C2": 20, "CN": "abc"} |
30 | def | { "C1": 99, "CN": "cde"} | |
40 | 50 | { "C1": 99, C2: 50} |
The columns names and number may vary, so I can't pass it explicitly. The strategy I'm using is:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
def jsonize_fields(row):
vars = {}
for k, v in row.asDict().items():
if v:
vars[k] = v
return json.dumps(vars)
jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))
This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?
CodePudding user response:
Just found it:
from pyspark.sql.functions import to_json
spark_data_frame = spark_data_frame.withColumn('JSON',
to_json(struct(*spark_data_frame.columns)))
And if you want to ignore empty values:
from pyspark.sql.functions import col,when
spark_data_frame = spark_data_frame.select(
[when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])
CodePudding user response:
I don't believe there's a straightforward way to achieve this without using UDFs.
However, PySpark actually has a built-in to_json
function for turning a STRUCT
into a JSON string. You don't need to write your own.
Behind the scenes, to_json
will call Spark's internal implementation of the function. In turn, that removes the overhead associated with Python UDFs and should improve performance.
The usage is very similar to your custom UDF:
from pyspark.sql.functions import struct, to_json
spark_data_frame = spark_data_frame.withColumn(
'JSON',
to_json(struct(*spark_data_frame.columns))
)