Home > database >  Exploding a key not present in JSON in PySpark
Exploding a key not present in JSON in PySpark

Time:10-06

I have a JSON input which consists of an array that will be exploded as follows:

new_df = df \
    .withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"),
    )

Sometimes the input file may be empty or may not have the JSON key 'x'. In such cases the pyspark code fails saying cannot resolve 'x' given input columns: [] .

Is there a way I can keep all the columns of this table and populate them all as NULL if this key is not present in the input JSON?

CodePudding user response:

Simply check if the column exists in the df using has_column function defined here. (Written function below for reference)

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False


if has_column(df, "x"):
    new_df = df.withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"))
else:
    new_df = df.withColumn("xp", lit(None).cast("string"))
               .withColumn("xq", lit(None).cast("string"))
               .withColumn("xrlg", lit(None).cast("string"))
               .withColumn("xrmf", lit(None).cast("string"))
               .withColumn("xrn", lit(None).cast("string"))
               .withColumn("xro", lit(None).cast("string"))
               .withColumn("xrs", lit(None).cast("string")) 

If you also want to check for inner json key values are present or not, you can do something like below for each column:

df.withColumn(
   "xp", 
   when(
       lit(has_column(df, "x.p")),
       col("x.p")
   ).otherwise(lit(None).cast("string")))

Another solution is to provide schema before reading from json file as suggested by hristo iliev

schema = StructType([
    StructField("x", StructType([
        StructField("p", StringType()),
        StructField("q", StringType()),
        ....
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
  • Related