Following is our pyspark application code snippet.
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
------- -------- --------------------------------------------- ---
| name| version| requestBody| id|
------- -------- --------------------------------------------- ---
|kj-test| 1|{"data": {"score": 130, "group": "silver"}} | 1|
|kj-test| 1|{"data": {"score": 250, "group": "gold"}} | 2|
|kj-test| 1|{"data": {"score": 330, "group": "platinum"}}| 3|
------- -------- --------------------------------------------- ---
The decrypt_udf UDF function snippet:
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str
When I write the spark dataframe to the S3 bucket as follows
df_new.write.mode('overwrite').json(path=s3outputpath)
The resulting file has content as follows, here the value of requestBody
is written as String
hence in double-quotes and also escaping the inner double quotes.
{"name":"kj-test","version":"1","requestBody":"{\"data\": {\"score\": 130, \"group\": \"silver\"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{\"data\": {\"score\": 250, \"group\": \"gold\"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{\"data\": {\"score\": 330, \"group\": \"platinum\"}}","id":"1"}
However, I am expecting the value of requestBody
to be written as a json as below.
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
I understand that I have specified the type for requestBody as string in the schema StructField('requestBody', StringType(), True)
and hence I see the output that way. How can I achieve the output that I am expecting? There is no such type as JsonType
EDIT:
Please note that my requestBody
schema will not be always like this {"data": {"score": 130, "group": "silver"}}
. For a given run it is fixed but another run could have a totally different schema.
Essentially, need a way to infer the schema from the json string. Found some SO posts which could be helpful, will try these out:
https://stackoverflow.com/a/45880574/948268
Spark from_json with dynamic schema
CodePudding user response:
Try below code. (I have not tested)
Convert requestBody
json string to struct using from_json
function.
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
Prepare schema for requestBody
requestSchema=StructType(
[
StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.withColumn()
.na.drop() \
.withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)
CodePudding user response:
In your udf, add the following method which converts a python object to a JSON string:
import json
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return json.dumps(decrypted_json_str)