Home > front end >  How to make JSON flattening memory efficient?
How to make JSON flattening memory efficient?

Time:06-23

Problem Statement: I have around 500 ZIP files with lots of XMLS, i am able to convert them to JSON and parse them to parquet files as example below for one nested JSON file.

Not able to process multiple files with spark also

I have below code that flattens whole JSON into pandas data frame but now have to run this code over 150,000 files. when my JSON is very big it takes around 2 minutes to flatten whole data. Also if i run it using SPARK over my RDD of multiple files it fails with either OOM or struct error.

Am i doing something wrong SPARK wise ?

import xmltodict
import pandas as pd

def parser(master_tree):
  flatten_tree_node = []
  def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
      is_nested = False
      if isinstance(tree,dict):
        for k in tree.keys():
            if type(tree[k]) == str:
                colName = prefix   "_"   k
                tree_node[colName] = tree[k]
            elif type(tree[k]) == dict:
                prefix  = "_"   k
                leave = tree[k]
                _process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
        for k in tree.keys():
            if type(tree[k]) == list:
                is_nested = True
                prefix  = "_"   k
                for leave in tree[k]:
                    _process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
        if not is_nested and update:
            flatten_tree_node.append(tree_node)
        
  _process_leaves(master_tree)
  df = pd.DataFrame(flatten_tree_node)
  df.columns = df.columns.str.replace("@", "_")
  df.columns = df.columns.str.replace("#", "_")
  return df


def extractor(file_name,file):
    data = file.decode('utf-8')
    d = bytes(bytearray(data, encoding='utf-8'))
    data = xmltodict.parse(d)
    flatten_data = parser(dict_data)
    return (file_name,flatten_data)
    
def extract_files(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return [extractor(file_name,file_obj.open(file_name).read()) for file_name in files]
    
zip_rdd = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path','content').rdd

Fails here at the time of collection:

collected_data = zip_rdd.map(extract_files).collect()

Below Errors:

org.apache.spark.api.python.PythonException: 'struct.error: 'i' format requires -2147483648 <= number <= 2147483647'. Full traceback
 

or

java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123

Although everything works fine when ran one only single file.

Example Run of parsing nested JSON using parser function is like below:

Is there a way to make it memory and speed efficient ?

import pandas as pd
tree=     {
    "products":
    [

        {
            "id":"0",
            "name": "First",
            "emptylist":[],
            "properties" : 
            {
              "id" : "",
              "name" : ""
            }
        },
        {
            "id":"1",
            "name": "Second",
            "emptylist":[],
            "properties": 
            {
                "id" : "23",
                "name" : "a useful product",
                "features" :
                [
                    {
                        "name":"Features",
                        "id":"18",
                        "features":
                        [
                            {
                                "id":"1001",
                                "name":"Colour",
                                "value":"Black"
                            },
                            {
                                "id":"2093",
                                "name":"Material",
                                "value":"Plastic"
                            }
                        ]
                    },
                    {
                        "name":"Sizes",
                        "id":"34",
                        "features":
                        [
                            {
                                "id":"4736",
                                "name":"Length",
                                "value":"56"
                            },
                            {
                                "id":"8745",
                                "name":"Width",
                                "value":"76"
                            }
                        ]
                    }
                ]
            }
        },
        {
            "id":"2",
            "name": "Third",
            "properties" : 
            {
                "id" : "876",
                "name" : "another one",
                "features" : 
                [
                    {
                        "name":"Box",
                        "id":"937",
                        "features":
                        [
                            {
                                "id":"3758",
                                "name":"Amount",
                                "value":"1"
                            },
                            {
                                "id":"2222",
                                "name":"Packaging",
                                "value":"Blister"
                            }
                        ]
                    },
                    {
                        "name":"Features",
                        "id":"8473",
                        "features":
                        [
                            {
                                "id":"9372",
                                "name":"Colour",
                                "value":"White"
                            },
                            {
                                "id":"9375",
                                "name":"Position",
                                "value":"A"
                            },
                            {
                                "id":"2654",
                                "name":"Amount",
                                "value":"6"
                            }
                        ]
                    }
                ]
            }
        }
    ]
}


def parser(master_tree):
  flatten_tree_node = []
  def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
      is_nested = False
      if isinstance(tree,dict):
        for k in tree.keys():
            if type(tree[k]) == str:
                colName = prefix   "_"   k
                tree_node[colName] = tree[k]
            elif type(tree[k]) == dict:
                prefix  = "_"   k
                leave = tree[k]
                _process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
        for k in tree.keys():
            if type(tree[k]) == list:
                is_nested = True
                prefix  = "_"   k
                for leave in tree[k]:
                    _process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
        if not is_nested and update:
            flatten_tree_node.append(tree_node)
        
  _process_leaves(master_tree)
  df = pd.DataFrame(flatten_tree_node)
  df.columns = df.columns.str.replace("@", "_")
  df.columns = df.columns.str.replace("#", "_")
  return df

print(parser(tree))

  node_products_id node_products_name  ... node_products_properties_features_features_name node_products_properties_features_features_value
0                1             Second  ...                                          Colour                                            Black
1                1             Second  ...                                        Material                                          Plastic
2                1             Second  ...                                          Length                                               56
3                1             Second  ...                                           Width                                               76
4                2              Third  ...                                          Amount                                                1
5                2              Third  ...                                       Packaging                                          Blister
6                2              Third  ...                                          Colour                                            White
7                2              Third  ...                                        Position                                                A
8                2              Third  ...                                          Amount                                                6
9                2              Third  ...                                             NaN                                              NaN

[10 rows x 9 columns]

CodePudding user response:

Do not collect this data, it's likely it will never fit in memory as you are trying to pull all the data into the driver.

You can just save it to a file directly.

collected_data = zip_rdd.map(extract_files).toDF("column","names","go","here")
collected_data.write.parquet("/path/to/folder")

CodePudding user response:

I do not have spark 3.2 but I'm aware of the features it posses. And in this case it will make your life easy. unionByName is a new feature that will let you magically join schemas.

collected_data = spark.createDataFrame( data = [], schema = [] )

zip_array = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path').collect() # this will likely fit in driver memory so it's OK to call. After all it's just a list of file paths.

for my_file in zip_array:
 collected_data = collected_data.unionByName( spark.createDataFrame(extract_files(my_file)), allowMissingColumns=True )
collected_data.write.parquet("/path/to/folder")
  • Related