Home > Enterprise >  How to add a column to complex Spark structures with python?
How to add a column to complex Spark structures with python?

Time:12-11

I need to add an encrypted version of certain elements to a complex nested structure using Spark Streaming. The JSON elements that come in can have different schemas and therefore I'm looking for a dynamic solution on which I do not need to hardcode Spark schemas.

For instance, this is one the JSONs I could get:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                 "ID":1,
                 "thisisthevaluetoenrcypt": "imthevaluetoencrypt"
            }
        ]
   }
}

What I would like to accomplish is getting the following:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                 "ID":1,
                 "thisisthevaluetoenrcypt": "imthevaluetoencrypt",
                 "thisisthevaluetoenrcypt_masked": "BNHFBYHTYBFDBY"
            }
        ]
   }
}

Like I mentioned, the schemas could be different, so I may also get something like this:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                "onemorestruct":
                {
                     "ID":1,
                     "thisisthevaluetoenrcypt": "imthevaluetoencrypt"
                }
            }
        ],
        "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"
   }
}

And I would like to get something like this:

{
   "hello":"world",
   "thisisastruct":
   {
       "thisisanarray":
        [
            "thisisanotherstruct":
            {
                "onemorestruct":
                {
                     "ID":1,
                     "thisisthevaluetoencrypt": "imthevaluetoencrypt",
                     "thisisthevaluetoencrypt_masked": "BNHFBYHTYBFDBY"
                }
            }
        ],
        "thisisanothervaluetoencrypt": "imtheothervaluetoencrypt",
        "thisisanothervaluetoencrypt_masked": "TYHRBVTRHTYJTJ"
   }
}

I have a python method to encrypt the value; however, I'm unable to dynamically change the struct. I think something like this could be helpful, but unfortunately I do not have Scala experience and I'm unable to translate it to pyspark, and change it so that it adds a new field instead of changing the current value

Change value of nested column in DataFrame

Any help would be greatly appreciated

EDIT: This is the function i use to encrypt the data. I am doing it through a UDF but can change that if needed

def encrypt_string(s):
    result = []
    kms = boto3.client('kms', region_name = 'us-west-2')
    response = kms.encrypt(
        KeyId=key_id,
        Plaintext= str(s)
    )
    return response['CiphertextBlob']

CodePudding user response:

Since your JSON can have widely differing schemas to parse them into a Spark struct you will need a schema which would be the union of all possible schemas, which is unwieldy since you are not aware of what the JSON would look like. Hence, I suggest keeping the JSON string as is and using an UDF to parse string to dict and updating the values and returning the results back as a JSON string.

from typing import Dict, Any, List
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import json

data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}', ), 
        ('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}', ), ]

df = spark.createDataFrame(data, ("json_col", ))

def encrypt(s: str) -> str:
    return f"encrypted_{s}"

def walk_dict(struct: Dict[str, Any], fields_to_encrypt: List[str]):
    keys_copy = set(struct.keys())
    for k in keys_copy:
        if k in fields_to_encrypt and isinstance(struct[k], str):
            struct[f"{k}_masked"] = encrypt(struct[k])
        else:
            walk_fields(struct[k], fields_to_encrypt)

def walk_fields(field: Any, fields_to_encrypt: List[str]):
    if isinstance(field, dict):
        walk_dict(field, fields_to_encrypt)
    if isinstance(field, list):
        [walk_fields(e, fields_to_encrypt) for e in field]
        
def encrypt_fields(json_string: str) -> str:
    fields_to_encrypt = ["thisisthevaluetoenrcypt", "thisisanothervaluetoenrcypt"]
    as_json = json.loads(json_string)
    walk_fields(as_json, fields_to_encrypt)
    return json.dumps(as_json)

field_encryption_udf = F.udf(encrypt_fields, StringType())

df.withColumn("encrypted", field_encryption_udf(F.col("json_col"))).show(truncate=False)

Output

 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|json_col                                                                                                                                                                                                                    |encrypted                                                                                                                                                                                                                                                                                                                                                                  |
 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}]}}                                                                              |{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}]}}                                                                                                                                                          |
|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt"}}|{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"onemorestruct": {"ID": 1, "thisisthevaluetoenrcypt": "imthevaluetoencrypt", "thisisthevaluetoenrcypt_masked": "encrypted_imthevaluetoencrypt"}}}], "thisisanothervaluetoenrcypt": "imtheothervaluetoencrypt", "thisisanothervaluetoenrcypt_masked": "encrypted_imtheothervaluetoencrypt"}}|
 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
  • Related