Home > Net >  Rewrite Pyspark function to join columns in json
Rewrite Pyspark function to join columns in json

Time:06-25

I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:

C1 C2 CN
10 20 abc
99 cde
40 50

Should be transformed to this:

C1 C2 CN JSON
10 20 abc { "C1": 10, "C2": 20, "CN": "abc"}
30 def { "C1": 99, "CN": "cde"}
40 50 { "C1": 99, C2: 50}

The columns names and number may vary, so I can't pass it explicitly. The strategy I'm using is:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

def jsonize_fields(row):
    vars = {}
    for k, v in row.asDict().items():
        if v:
            vars[k] = v
    return json.dumps(vars)

jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))

This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?

CodePudding user response:

Just found it:

from pyspark.sql.functions import to_json

spark_data_frame = spark_data_frame.withColumn('JSON',
    to_json(struct(*spark_data_frame.columns)))

And if you want to ignore empty values:

from pyspark.sql.functions import col,when

spark_data_frame = spark_data_frame.select(
    [when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])

CodePudding user response:

I don't believe there's a straightforward way to achieve this without using UDFs.

However, PySpark actually has a built-in to_json function for turning a STRUCT into a JSON string. You don't need to write your own.

Behind the scenes, to_json will call Spark's internal implementation of the function. In turn, that removes the overhead associated with Python UDFs and should improve performance.

The usage is very similar to your custom UDF:

from pyspark.sql.functions import struct, to_json

spark_data_frame = spark_data_frame.withColumn(
    'JSON',
    to_json(struct(*spark_data_frame.columns))
)
  • Related