I have multiple source JSON files where each JSON file will have different schemas present in it.
For example below 3 Json files, I've presented it in a tabular view for better understanding, but I've also provided the schema as well:
Json File 1:
Key | Value | Column_Name |
---|---|---|
sample_column.pull.notify.roid.alert |
aaa | Column A |
sample_column.pull.notify.roid.title |
bbb | Column B |
sample_column.pull.notify.roid.action.pan.content |
ccc | Column C |
JSON File 1 Schema:
|-- sample_column: struct (nullable = true)
| |-- pull: struct (nullable = true)
| | |-- notify: struct (nullable = true)
| | | |-- roid: struct (nullable = true)
| | | | |-- action: struct (nullable = true)
| | | | | |-- pan: struct (nullable = true)
| | | | | | |-- content: string (nullable = true)
| | | | |-- alert: string (nullable = true)
| | | | |-- title: string (nullable = true)
Json File 2:
Key | Value | Column_Name |
---|---|---|
sample_column.cone.pull.notify.roid.alert |
a1a1a1 | Column A |
sample_column.cone.pull.notify.roid.title |
b1b1b1 | Column B |
sample_column.cone.pull.notify.roid.action.pan.content |
c1c1c1 | Column C |
JSON File 2 Schema:
|-- sample_column: struct (nullable = true)
| |-- cone: struct (nullable = true)
| | |-- pull: struct (nullable = true)
| | | |-- notify: struct (nullable = true)
| | | | |-- roid: struct (nullable = true)
| | | | | |-- action: struct (nullable = true)
| | | | | | |-- pan: struct (nullable = true)
| | | | | | | |-- content: string (nullable = true)
| | | | | |-- alert: string (nullable = true)
| | | | | |-- title: string (nullable = true)
Json File 3:
Key | Value | Column_Name |
---|---|---|
sample_column.var.pull.notify.roid.alert |
a2a2a2 | Column A |
sample_column.var.pull.notify.roid.title |
b2b2b2 | Column B |
sample_column.var.pull.notify.roid.action.pan.content |
c2c2c2 | Column C |
JSON File 3 Schema:
|-- sample_column: struct (nullable = true)
| |-- var: struct (nullable = true)
| | |-- pull: struct (nullable = true)
| | | |-- notify: struct (nullable = true)
| | | | |-- roid: struct (nullable = true)
| | | | | |-- action: struct (nullable = true)
| | | | | | |-- pan: struct (nullable = true)
| | | | | | | |-- content: string (nullable = true)
| | | | | |-- alert: string (nullable = true)
| | | | | |-- title: string (nullable = true)
I need to get "alert", "title" and "content" from those 3 different schemas and put all the "Alert" values under Column A, then all the "Title" values under Column B and all the "Content" value under Column C
This will be the expected output Table:
Column A | Column B | Column C |
---|---|---|
aaa | bbb | ccc |
a1a1a1 | b1b1b1 | c1c1c1 |
a2a2a2 | b2b2b2 | c2c2c2 |
So, as the above scenario I tried to append all different schema like all ('a') values under column A then ('b') values under column B and ('c') values under column C but the values are getting overwritten like below:
Column A | Column B | Column C |
---|---|---|
a2a2a2 | b2b2b2 | c2c2c2 |
Below is the Pyspark Code that I've tried which overwrites the values but not appending it:
def has_column(df, col):
# Checks whether the dataframe has a column, if the dataframe has a column, returns True else False.
try:
df[col]
return True
except:
return False
if has_column(df, "sample_column.pull.notify.roid.alert"):
try:
df = df.withColumn('Column A', when(df.sample_column.pull.notify.roid.alert.isNotNull(), df["sample_column.pull.notify.roid.alert"]))
except:
df = df.withColumn('Column A', lit("").cast("string"))
if has_column(df, "sample_column.cone.pull.notify.roid.title"):
try:
df = df.withColumn('Column A', when(df.sample_column.cone.pull.notify.roid.title.isNotNull(), df["sample_column.cone.pull.notify.roid.title"]))
except:
df = df.withColumn('Column A', lit("").cast("string"))
if has_column(df, "sample_column.var.pull.notify.roid.action.pan.content"):
try:
df = df.withColumn('Column A', when(df.sample_column.var.pull.notify.roid.action.pan.content.isNotNull(), df["sample_column.var.pull.notify.roid.action.pan.content"]))
except:
df = df.withColumn('Column A', lit("").cast("string"))
Help is much appreciated!
CodePudding user response:
data =[["aaa","bbb","ccc"],["a1a1a1","b1b1b1","c1c1c1"],["a2a2a2","b2b2b2","c2c2c2"]]
columns = ["ColumnA","ColumnB","ColumnC"]
df_in = spark.createDataFrame(data,columns)
(
df_in
.agg(collect_list("ColumnA").alias("ColumnA"),
collect_list("ColumnB").alias("ColumnB"),
collect_list("ColumnC").alias("ColumnC"))
.show(10, False)
)
# This result maybe what you wanted.
# --------------------- --------------------- ---------------------
#|ColumnA |ColumnB |ColumnC |
# --------------------- --------------------- ---------------------
#|[aaa, a1a1a1, a2a2a2]|[bbb, b1b1b1, b2b2b2]|[ccc, c1c1c1, c2c2c2]|
# --------------------- --------------------- ---------------------
CodePudding user response:
You can manipulate them with coalesce
.
First, set up what you want to extract.
paths = ['sample_column.pull.notify.roid',
'sample_column.cone.pull.notify.roid',
'sample_column.var.pull.notify.roid']
extract_keys = ['alert', 'title', 'action.pan.content']
Then use coalesce to get value where it exists.
df = (df.select(*[
coalesce(*[col(f'{path}.{key}') for path in paths]).alias(key)
for key in extract_keys
])
.withColumnRenamed('action.pan.content', 'content')
)
Explaination
I used double list comprehension in above example but you can break this down.
This function;
coalesce(*[col(f'{path}.title') for path in paths])
translates to
coalesce('sample_column.pull.notify.roid.title',
'sample_column.cone.pull.notify.roid.title',
'sample_column.var.pull.notify.roid.title')
and this will return whichever has a (non-null) value per row (row=1 JSON).
And I am doing this for all extract_keys
, that is the outer list comprehension.
Consideration
I hardcoded the paths in this example. If you are curious to go beyond and figure out the paths
without hardcoding, you can probably look into Is there a way to collect the names of all fields in a nested schema in pyspark for some ideas.