I need to add an encrypted version of certain elements to a complex nested structure using Spark Streaming. The JSON elements that come in can have different schemas and therefore I'm looking for a dynamic solution on which I do not need to hardcode Spark schemas.
For instance, this is one the JSONs I could get:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
]
}
}
What I would like to accomplish is getting the following:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt",
"thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
}
]
}
}
Like I mentioned, the schemas could be different, so I may also get something like this:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoenrcypt": "imthevaluetoencrypt"
}
}
],
"thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
}
}
And I would like to get something like this:
{
"hello":"world",
"thisisastruct":
{
"thisisanarray":
[
"thisisanotherstruct":
{
"onemorestruct":
{
"ID":1,
"thisisthevaluetoencrypt": "imthevaluetoencrypt",
"thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
}
}
],
"thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
"thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
}
}
I have a python method to encrypt the value; however, I'm unable to dynamically change the struct. I think something like this could be helpful, but unfortunately I do not have Scala experience and I'm unable to translate it to pyspark, and change it so that it adds a new field instead of changing the current value
Change value of nested column in DataFrame
Any help would be greatly appreciated
EDIT: This is the function i use to encrypt the data. I am doing it through a UDF but can change that if needed
def encrypt_string(s):
result = []
kms = boto3.client('kms', region_name = 'us-west-2')
response = kms.encrypt(
KeyId=key_id,
Plaintext= str(s)
)
return response['CiphertextBlob']
CodePudding user response:
Since your JSON
can have widely differing schemas to parse them into a Spark struct you will need a schema which would be the union of all possible schemas, which is unwieldy since you are not aware of what the JSON
would look like. Hence, I suggest keeping the JSON
string as is and using an UDF
to parse string to dict
and updating the values and returning the results back as a JSON
string.
from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json
data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}', ),
('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}', ), ]
df = spark.createDataFrame(data, ("json_col", ))
def encrypt(s: str) -> str:
return f"encrypted_{s}"
def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
keys_copy = set(struct.keys())
for k in keys_copy:
if k in fields_to_encrypt and isinstance(struct[k], str):
struct[f"{k}_masked"] = encrypt(struct[k])
else:
walk_fields(struct[k], fields_to_encrypt)
def walk_fields(field: Any, fields_to_encrypt: List[str]):
if isinstance(field, dict):
walk_dict(field, fields_to_encrypt)
if isinstance(field, list):
[walk_fields(e, fields_to_encrypt) for e in field]
def encrypt_fields(json_string: str) -> str:
fields_to_encrypt = ["thisisthevaluetoenrcypt", "thisisanothervaluetoenrcypt"]
as_json = json.loads(json_string)
walk_fields(as_json, fields_to_encrypt)
return json.dumps(as_json)
field_encryption_udf = F.udf(encrypt_fields, StringType())
df.withColumn("encrypted", field_encryption_udf(F.col("json_col"))).show(truncate=False)
Output
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|json_col |encrypted |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}} |{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}]}} |
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt", "thisisanothervaluetoenrcypt_masked": "encrypted_imtheothervaluetoencrypt"}}|
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------