I have a JSON input which consists of an array that will be exploded as follows:
new_df = df \
.withColumn("x", explode_outer(col("x"))) \
.select(
col("x.p").alias("xp"),
col("x.q").alias("xq"),
col("x.r.l.g").alias("xrlg"),
col("x.r.m.f").alias("xrmf"),
col("x.r.n").alias("xrn"),
col("x.r.o").alias("xro"),
col("x.r.s").alias("xrs"),
)
Sometimes the input file may be empty or may not have the JSON key 'x'. In such cases the pyspark code fails saying cannot resolve 'x' given input columns: []
.
Is there a way I can keep all the columns of this table and populate them all as NULL if this key is not present in the input JSON?
CodePudding user response:
Simply check if the column exists in the df using has_column function defined here. (Written function below for reference)
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
if has_column(df, "x"):
new_df = df.withColumn("x", explode_outer(col("x"))) \
.select(
col("x.p").alias("xp"),
col("x.q").alias("xq"),
col("x.r.l.g").alias("xrlg"),
col("x.r.m.f").alias("xrmf"),
col("x.r.n").alias("xrn"),
col("x.r.o").alias("xro"),
col("x.r.s").alias("xrs"))
else:
new_df = df.withColumn("xp", lit(None).cast("string"))
.withColumn("xq", lit(None).cast("string"))
.withColumn("xrlg", lit(None).cast("string"))
.withColumn("xrmf", lit(None).cast("string"))
.withColumn("xrn", lit(None).cast("string"))
.withColumn("xro", lit(None).cast("string"))
.withColumn("xrs", lit(None).cast("string"))
If you also want to check for inner json key values are present or not, you can do something like below for each column:
df.withColumn(
"xp",
when(
lit(has_column(df, "x.p")),
col("x.p")
).otherwise(lit(None).cast("string")))
Another solution is to provide schema before reading from json file as suggested by hristo iliev
schema = StructType([
StructField("x", StructType([
StructField("p", StringType()),
StructField("q", StringType()),
....
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)