Problem Statement: I have around 500 ZIP files with lots of XMLS, i am able to convert them to JSON and parse them to parquet files as example below for one nested JSON file.
Not able to process multiple files with spark also
I have below code that flattens whole JSON into pandas data frame but now have to run this code over 150,000 files. when my JSON is very big it takes around 2 minutes to flatten whole data. Also if i run it using SPARK over my RDD of multiple files it fails with either OOM or struct error.
Am i doing something wrong SPARK wise ?
import xmltodict
import pandas as pd
def parser(master_tree):
flatten_tree_node = []
def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
is_nested = False
if isinstance(tree,dict):
for k in tree.keys():
if type(tree[k]) == str:
colName = prefix "_" k
tree_node[colName] = tree[k]
elif type(tree[k]) == dict:
prefix = "_" k
leave = tree[k]
_process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
for k in tree.keys():
if type(tree[k]) == list:
is_nested = True
prefix = "_" k
for leave in tree[k]:
_process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
if not is_nested and update:
flatten_tree_node.append(tree_node)
_process_leaves(master_tree)
df = pd.DataFrame(flatten_tree_node)
df.columns = df.columns.str.replace("@", "_")
df.columns = df.columns.str.replace("#", "_")
return df
def extractor(file_name,file):
data = file.decode('utf-8')
d = bytes(bytearray(data, encoding='utf-8'))
data = xmltodict.parse(d)
flatten_data = parser(dict_data)
return (file_name,flatten_data)
def extract_files(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return [extractor(file_name,file_obj.open(file_name).read()) for file_name in files]
zip_rdd = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path','content').rdd
Fails here at the time of collection:
collected_data = zip_rdd.map(extract_files).collect()
Below Errors:
org.apache.spark.api.python.PythonException: 'struct.error: 'i' format requires -2147483648 <= number <= 2147483647'. Full traceback
or
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123
Although everything works fine when ran one only single file.
Example Run of parsing nested JSON using parser function is like below:
Is there a way to make it memory and speed efficient ?
import pandas as pd
tree= {
"products":
[
{
"id":"0",
"name": "First",
"emptylist":[],
"properties" :
{
"id" : "",
"name" : ""
}
},
{
"id":"1",
"name": "Second",
"emptylist":[],
"properties":
{
"id" : "23",
"name" : "a useful product",
"features" :
[
{
"name":"Features",
"id":"18",
"features":
[
{
"id":"1001",
"name":"Colour",
"value":"Black"
},
{
"id":"2093",
"name":"Material",
"value":"Plastic"
}
]
},
{
"name":"Sizes",
"id":"34",
"features":
[
{
"id":"4736",
"name":"Length",
"value":"56"
},
{
"id":"8745",
"name":"Width",
"value":"76"
}
]
}
]
}
},
{
"id":"2",
"name": "Third",
"properties" :
{
"id" : "876",
"name" : "another one",
"features" :
[
{
"name":"Box",
"id":"937",
"features":
[
{
"id":"3758",
"name":"Amount",
"value":"1"
},
{
"id":"2222",
"name":"Packaging",
"value":"Blister"
}
]
},
{
"name":"Features",
"id":"8473",
"features":
[
{
"id":"9372",
"name":"Colour",
"value":"White"
},
{
"id":"9375",
"name":"Position",
"value":"A"
},
{
"id":"2654",
"name":"Amount",
"value":"6"
}
]
}
]
}
}
]
}
def parser(master_tree):
flatten_tree_node = []
def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
is_nested = False
if isinstance(tree,dict):
for k in tree.keys():
if type(tree[k]) == str:
colName = prefix "_" k
tree_node[colName] = tree[k]
elif type(tree[k]) == dict:
prefix = "_" k
leave = tree[k]
_process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
for k in tree.keys():
if type(tree[k]) == list:
is_nested = True
prefix = "_" k
for leave in tree[k]:
_process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
if not is_nested and update:
flatten_tree_node.append(tree_node)
_process_leaves(master_tree)
df = pd.DataFrame(flatten_tree_node)
df.columns = df.columns.str.replace("@", "_")
df.columns = df.columns.str.replace("#", "_")
return df
print(parser(tree))
node_products_id node_products_name ... node_products_properties_features_features_name node_products_properties_features_features_value
0 1 Second ... Colour Black
1 1 Second ... Material Plastic
2 1 Second ... Length 56
3 1 Second ... Width 76
4 2 Third ... Amount 1
5 2 Third ... Packaging Blister
6 2 Third ... Colour White
7 2 Third ... Position A
8 2 Third ... Amount 6
9 2 Third ... NaN NaN
[10 rows x 9 columns]
CodePudding user response:
Do not collect
this data, it's likely it will never fit in memory as you are trying to pull all the data into the driver.
You can just save it to a file directly.
collected_data = zip_rdd.map(extract_files).toDF("column","names","go","here")
collected_data.write.parquet("/path/to/folder")
CodePudding user response:
I do not have spark 3.2 but I'm aware of the features it posses. And in this case it will make your life easy. unionByName
is a new feature that will let you magically join schemas.
collected_data = spark.createDataFrame( data = [], schema = [] )
zip_array = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path').collect() # this will likely fit in driver memory so it's OK to call. After all it's just a list of file paths.
for my_file in zip_array:
collected_data = collected_data.unionByName( spark.createDataFrame(extract_files(my_file)), allowMissingColumns=True )
collected_data.write.parquet("/path/to/folder")