I am trying to convert 2 levels of nested json into pyspark dataframe. Below is my JSON schema looks like:
I am always getting nulls while converting to spark dataframe for products struct which is the last level of nested JSON.
CodePudding user response:
If the structure is fixed as shown in description then try this:
df.select(df.col("b_Code"), df.col("b_Key"),df.col("r_data.s_key"), df.col("r_data.s_Code"), df.col("r_data.products.s_key"), df.col("r_data.products.s_Code"), df.col("r_data.products.s_Type"), df.col("r_data.products.r_type"), df.col("r_data.products.sl"), df.col("r_data.products.sp"))
Here is a function that will flatten nested df irrespective of level of nesting in json
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents (c[0],))).alias("_".join(parents (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col ".*")
stack.append((parents (nested_col,), projected_df))
return nested_df.select(columns)
CodePudding user response:
Have you tried to force the schema ?
you can try this, because, apparently, you have a different schema in each files, so enforcing the proper schema should solve your problem :
from pyspark.sql import types as T
schema = T.StructType(
[
T.StructField("b_key", T.IntegerType()),
T.StructField("b_code", T.StringType()),
T.StructField(
"r_date",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField(
"products",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField("s_type", T.StringType()),
T.StructField("r_type", T.StringType()),
T.StructField("sl", T.DecimalType()),
T.StructField("sp", T.IntegerType()),
]
),
),
]
),
),
]
)
df = spark.read.json("path/to/file.json", schema=schema)
From, there, you do not have any array, so you can simply select
the nested columns to flatten. For example :
df.selct(
"r_data.*"
)
This will flatten the r_data struct column, and you will end up with 3 columns.