Home > OS >  How to append multiple JSON schema values into a single column under a dataframe using Pyspark
How to append multiple JSON schema values into a single column under a dataframe using Pyspark

Time:07-15

I have multiple source JSON files where each JSON file will have different schemas present in it.

For example below 3 Json files, I've presented it in a tabular view for better understanding, but I've also provided the schema as well:

Json File 1:

Key Value Column_Name
sample_column.pull.notify.roid.alert aaa Column A
sample_column.pull.notify.roid.title bbb Column B
sample_column.pull.notify.roid.action.pan.content ccc Column C

JSON File 1 Schema:

|-- sample_column: struct (nullable = true)
|    |-- pull: struct (nullable = true)
|    |    |-- notify: struct (nullable = true)
|    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |-- title: string (nullable = true)

Json File 2:

Key Value Column_Name
sample_column.cone.pull.notify.roid.alert a1a1a1 Column A
sample_column.cone.pull.notify.roid.title b1b1b1 Column B
sample_column.cone.pull.notify.roid.action.pan.content c1c1c1 Column C

JSON File 2 Schema:

|-- sample_column: struct (nullable = true)
|    |-- cone: struct (nullable = true)
|    |    |-- pull: struct (nullable = true)
|    |    |    |-- notify: struct (nullable = true)
|    |    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |    |-- title: string (nullable = true)

Json File 3:

Key Value Column_Name
sample_column.var.pull.notify.roid.alert a2a2a2 Column A
sample_column.var.pull.notify.roid.title b2b2b2 Column B
sample_column.var.pull.notify.roid.action.pan.content c2c2c2 Column C

JSON File 3 Schema:

|-- sample_column: struct (nullable = true)
|    |-- var: struct (nullable = true)
|    |    |-- pull: struct (nullable = true)
|    |    |    |-- notify: struct (nullable = true)
|    |    |    |    |-- roid: struct (nullable = true)
|    |    |    |    |    |-- action: struct (nullable = true)
|    |    |    |    |    |    |-- pan: struct (nullable = true)
|    |    |    |    |    |    |    |-- content: string (nullable = true)
|    |    |    |    |    |-- alert: string (nullable = true)
|    |    |    |    |    |-- title: string (nullable = true)

I need to get "alert", "title" and "content" from those 3 different schemas and put all the "Alert" values under Column A, then all the "Title" values under Column B and all the "Content" value under Column C

This will be the expected output Table:

Column A Column B Column C
aaa bbb ccc
a1a1a1 b1b1b1 c1c1c1
a2a2a2 b2b2b2 c2c2c2

So, as the above scenario I tried to append all different schema like all ('a') values under column A then ('b') values under column B and ('c') values under column C but the values are getting overwritten like below:

Column A Column B Column C
a2a2a2 b2b2b2 c2c2c2

Below is the Pyspark Code that I've tried which overwrites the values but not appending it:

def has_column(df, col):
# Checks whether the dataframe has a column, if the dataframe has a column, returns True else False.
    try:
        df[col]
        return True
    except:
        return False

if has_column(df, "sample_column.pull.notify.roid.alert"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.pull.notify.roid.alert.isNotNull(), df["sample_column.pull.notify.roid.alert"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

if has_column(df, "sample_column.cone.pull.notify.roid.title"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.cone.pull.notify.roid.title.isNotNull(), df["sample_column.cone.pull.notify.roid.title"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

if has_column(df, "sample_column.var.pull.notify.roid.action.pan.content"):
     try:
         df = df.withColumn('Column A', when(df.sample_column.var.pull.notify.roid.action.pan.content.isNotNull(), df["sample_column.var.pull.notify.roid.action.pan.content"]))
     except:
         df = df.withColumn('Column A', lit("").cast("string"))

Help is much appreciated!

CodePudding user response:

    data =[["aaa","bbb","ccc"],["a1a1a1","b1b1b1","c1c1c1"],["a2a2a2","b2b2b2","c2c2c2"]]
    columns = ["ColumnA","ColumnB","ColumnC"]
    df_in = spark.createDataFrame(data,columns)
    (
      df_in
        .agg(collect_list("ColumnA").alias("ColumnA"),
             collect_list("ColumnB").alias("ColumnB"),
             collect_list("ColumnC").alias("ColumnC"))
      .show(10, False)
    )
    # This result maybe what you wanted.
    # --------------------- --------------------- --------------------- 
    #|ColumnA              |ColumnB              |ColumnC              |
    # --------------------- --------------------- --------------------- 
    #|[aaa, a1a1a1, a2a2a2]|[bbb, b1b1b1, b2b2b2]|[ccc, c1c1c1, c2c2c2]|
    # --------------------- --------------------- --------------------- 

CodePudding user response:

You can manipulate them with coalesce.

First, set up what you want to extract.

paths = ['sample_column.pull.notify.roid',
         'sample_column.cone.pull.notify.roid',
         'sample_column.var.pull.notify.roid']

extract_keys = ['alert', 'title', 'action.pan.content']

Then use coalesce to get value where it exists.

df = (df.select(*[
          coalesce(*[col(f'{path}.{key}') for path in paths]).alias(key)
          for key in extract_keys
        ])
      .withColumnRenamed('action.pan.content', 'content')
)

Explaination

I used double list comprehension in above example but you can break this down.

This function;

coalesce(*[col(f'{path}.title') for path in paths])

translates to

coalesce('sample_column.pull.notify.roid.title',
         'sample_column.cone.pull.notify.roid.title',
         'sample_column.var.pull.notify.roid.title')

and this will return whichever has a (non-null) value per row (row=1 JSON).

And I am doing this for all extract_keys, that is the outer list comprehension.

Consideration

I hardcoded the paths in this example. If you are curious to go beyond and figure out the paths without hardcoding, you can probably look into Is there a way to collect the names of all fields in a nested schema in pyspark for some ideas.

  • Related