Home > OS >  How to specify the type of PySpark Dataframe column as JSON
How to specify the type of PySpark Dataframe column as JSON

Time:10-26

Following is our pyspark application code snippet.

schema = StructType(
    [
        StructField('name', StringType(), True),
        StructField('version', StringType(), True),
        StructField('requestBody', StringType(), True),
        StructField('id', StringType(), True),
    ]
)

df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .na.drop() \
    .withColumn('requestBody', decrypt_udf(col('requestBody')))

df_new.show()

 ------- -------- --------------------------------------------- --- 
|   name| version|                                  requestBody| id|
 ------- -------- --------------------------------------------- --- 
|kj-test|       1|{"data": {"score": 130, "group": "silver"}}  |  1|
|kj-test|       1|{"data": {"score": 250, "group": "gold"}}    |  2|
|kj-test|       1|{"data": {"score": 330, "group": "platinum"}}|  3|
 ------- -------- --------------------------------------------- --- 

The decrypt_udf UDF function snippet:

@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
   ...
   ...
   return decrypted_json_str

When I write the spark dataframe to the S3 bucket as follows

df_new.write.mode('overwrite').json(path=s3outputpath)

The resulting file has content as follows, here the value of requestBody is written as String hence in double-quotes and also escaping the inner double quotes.

{"name":"kj-test","version":"1","requestBody":"{\"data\": {\"score\": 130, \"group\": \"silver\"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{\"data\": {\"score\": 250, \"group\": \"gold\"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{\"data\": {\"score\": 330, \"group\": \"platinum\"}}","id":"1"}

However, I am expecting the value of requestBody to be written as a json as below.

{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}

I understand that I have specified the type for requestBody as string in the schema StructField('requestBody', StringType(), True) and hence I see the output that way. How can I achieve the output that I am expecting? There is no such type as JsonType


EDIT:

Please note that my requestBody schema will not be always like this {"data": {"score": 130, "group": "silver"}}. For a given run it is fixed but another run could have a totally different schema.

Essentially, need a way to infer the schema from the json string. Found some SO posts which could be helpful, will try these out:

https://stackoverflow.com/a/45880574/948268
Spark from_json with dynamic schema

CodePudding user response:

Try below code. (I have not tested)

Convert requestBody json string to struct using from_json function.

schema = StructType(
    [
        StructField('name', StringType(), True),
        StructField('version', StringType(), True),
        StructField('requestBody', StringType(), True),
        StructField('id', StringType(), True),
    ]
)

Prepare schema for requestBody

requestSchema=StructType(
    [
        StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
    ]
)
df_new = df.withColumn('value', from_json('value', schema)) \
    .where(col('value.version') == '1') \
    .select(col('value.*'))\
    .withColumn()
    .na.drop() \
    .withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)

CodePudding user response:

In your udf, add the following method which converts a python object to a JSON string:

import json   
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
   ...
   ...
   return json.dumps(decrypted_json_str)
  • Related