Home > Blockchain >  How to flatten the JSON format data into spark dataframe
How to flatten the JSON format data into spark dataframe

Time:09-23

I am trying to convert 2 levels of nested json into pyspark dataframe. Below is my JSON schema looks like:

enter image description here

I am always getting nulls while converting to spark dataframe for products struct which is the last level of nested JSON.

CodePudding user response:

If the structure is fixed as shown in description then try this:

df.select(df.col("b_Code"), df.col("b_Key"),df.col("r_data.s_key"), df.col("r_data.s_Code"), df.col("r_data.products.s_key"), df.col("r_data.products.s_Code"), df.col("r_data.products.s_Type"), df.col("r_data.products.r_type"), df.col("r_data.products.sl"), df.col("r_data.products.sp"))

Here is a function that will flatten nested df irrespective of level of nesting in json


from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents   (c[0],))).alias("_".join(parents   (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col   ".*")
            stack.append((parents   (nested_col,), projected_df))

    return nested_df.select(columns)

CodePudding user response:

Have you tried to force the schema ?

you can try this, because, apparently, you have a different schema in each files, so enforcing the proper schema should solve your problem :

from pyspark.sql import types as T


schema = T.StructType(
    [
        T.StructField("b_key", T.IntegerType()),
        T.StructField("b_code", T.StringType()),
        T.StructField(
            "r_date",
            T.StructType(
                [
                    T.StructField("s_key", T.IntegerType()),
                    T.StructField("s_code", T.StringType()),
                    T.StructField(
                        "products",
                        T.StructType(
                            [
                                T.StructField("s_key", T.IntegerType()),
                                T.StructField("s_key", T.IntegerType()),
                                T.StructField("s_code", T.StringType()),
                                T.StructField("s_type", T.StringType()),
                                T.StructField("r_type", T.StringType()),
                                T.StructField("sl", T.DecimalType()),
                                T.StructField("sp", T.IntegerType()),
                            ]
                        ),
                    ),
                ]
            ),
        ),
    ]
)


df = spark.read.json("path/to/file.json", schema=schema)

From, there, you do not have any array, so you can simply select the nested columns to flatten. For example :

df.selct(
    "r_data.*"
)

This will flatten the r_data struct column, and you will end up with 3 columns.

  • Related