I am analysing some JSON data in Palantir Foundry using Pyspark. The source is a 30MB uploaded JSON file containing four elements, one of which holds a table of some 60 columns and 20,000 rows. Some of the columns in this table are strings that contain HTML entities representing UTF characters (other columns are numeric or boolean). I want to clean these strings to replace the entities with the corresponding characters.
I realise that I can apply html.unescape(my_str)
in a UDF to the string columns once all the JSON data has been converted into dataframes. However, this sounds inefficient. This answer suggests it would be better to process the whole JSON file in one go, before converting it to a dataframe. However, my current code uses spark_session.read.json()
to go automatically from the raw file to a dataframe with a proper schema. I can't see how to modify it to include the unescape()
stage without ending up with everything as StringType()
, and I don't want to have to manually code the schema for every column of the nested data structure. My current code to read the JSON into a dataframe looks like this:
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from transforms.api import transform, Input, Output, Check
@transform(
parsed_output=Output("out_path"),
raw_file_input=Input("in_path")
)
def read_json(ctx, parsed_output, raw_file_input):
filesystem = raw_file_input.filesystem()
hadoop_path = filesystem.hadoop_path
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
df = ctx.spark_session.read.json(paths)
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
How can I adapt this to unescape()
the plain text of the JSON before it is parsed? Or would it actually be likely to be more efficient to unescape row by row and column by column later, using the parallelism in Spark, than trying to process the data as a single 30MB-long string?
An example of my JSON input format. The real input is about 30MB long and not pretty-printed. The Data
structure has many more rows, and around 60 columns. String columns are mixed in between numeric and boolean columns, in no particular order:
{
"Data": [
{"Id": 1, "Lots": "more", "data": "of", "different": "types", "Flag1": true, "RefNumber": 17}
{"Id": 2, "Lots": "of the string", "data": "includes entities like ≤ in", "different": "places", "Flag1": false, "RefNumber": 17781}
],
"TotalRows":2,
"Warnings":null,
"Errors":null
}
The final expected output from the above would end up as the below (I don't have any problem with processing the JSON into the right columns, it's just efficiently converting the entities to characters that is an issue). Note the math symbol ≤
in the 'data' field in row 2, rather than the entity ≤
:
Id | Lots | data | different | Flag1 | RefNumber
--- ----------------- ------------------------------- ----------- ------- -----------
1 | "more" | "of" | "types" | true | 17
2 | "of the string" | "includes entities like ≤ in" | "places" | false | 17781
CodePudding user response:
You can try adding a withColumn(cnameNew, udf_unescape(columnWithHtml))
. This should help you out.
Your udf could parse the html to dictionary and create a structT from it and return it. This should give you a complexTyped column and aid in further processing.
If you could share a sample of the data then I could extend on this answer.
CodePudding user response:
Broadly speaking, the answer that you linked (How to decode HTML entities in Spark?) is the right approach here. In order to achieve what you want (the convenience of letting Spark automagically infer a schema for you), here's how I would implement this in Foundry.
- As a first step, do something like this (note: I didn't actually run this code, so there might be a trivial mistake or two, but hopefully it illustrates the idea)
@transform(
output_json=Output("/NAMESPACE/PROJECT/datasets/preprocess/escaped_json"),
input_json=Input("/NAMESPACE/PROJECT/datasets/preprocess/raw_json"),
)
def compute_function(output_json, input_json):
input_fs = input_json.filesystem()
output_fs = output_json.filesystem()
def process_file(file_status):
with input_fs.open(file_status.path) as input_fp:
json_data = json.load(input_fp)
clean_up_json_in_place(json_data)
with output_fs.open(file_status.path, "w") as output_fp:
json.dump(json_data, output_fp)
input_fs.files().foreach(process_file)
- As a second step, do exactly what you're currently doing now, but make the input
/NAMESPACE/PROJECT/datasets/preprocess/escaped_json