I am trying to create multiple columns dynamically based on filter condition after comparing two data frame with below code
source_df
--- ----- -----
|key|val11|val12|
--- ----- -----
|abc| 1.1| john|
|def| 3.0| dani|
--- ----- -----
dest_df
--- ----- -----
|key|val11|val12|
--- ----- -----
|abc| 2.1| jack|
|def| 3.0| dani|
columns= source_df.columns[1:]
joined_df=source_df\
.join(dest_df, 'key', 'full')
for column in columns:
column_name="difference_in_" str(column)
report = joined_df\
.filter((source_df[column] != dest_df[column]))\
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
The output I expect is
#Expected
--- ----------------- ------------------
|key| difference_in_val11| difference_in_val12 |
--- ----------------- ------------------
|abc|[src:1.1,dst:2.1]|[src:john,dst:jack]|
--- ----------------- -------------------
I get only one column result
#Actual
--- ----------------- -
|key| difference_in_val12 |
--- ----------------- -|
|abc|[src:john,dst:jack]|
--- ----------------- -
How to generate multiple columns based on filter condition dynamically?
CodePudding user response:
Dataframes are immutable objects. Having said that, you need to create another dataframe using the one that got generated in the 1st iteration. Something like below -
from pyspark.sql import functions as F
columns= source_df.columns[1:]
joined_df=source_df\
.join(dest_df, 'key', 'full')
for column in columns:
if column != columns[-1]:
column_name="difference_in_" str(column)
report = joined_df\
.filter((source_df[column] != dest_df[column]))\
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
else:
column_name="difference_in_" str(column)
report1 = report.filter((source_df[column] != dest_df[column]))\
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
report1.show()
#report.show()
Output -
--- ----- ----- ----- ----- ------------------- -------------------
|key|val11|val12|val11|val12|difference_in_val11|difference_in_val12|
--- ----- ----- ----- ----- ------------------- -------------------
|abc| 1.1| john| 2.1| jack| [src:1.1,dst:2.1]|[src:john,dst:jack]|
--- ----- ----- ----- ----- ------------------- -------------------
CodePudding user response:
You could also do this with a union of both dataframes and then collect list only if collect_set size is greater than 1 , this can avoid joining the dataframes:
from pyspark.sql import functions as F
cols = source_df.drop("key").columns
output = (source_df.withColumn("ref",F.lit("src:"))
.unionByName(dest_df.withColumn("ref",F.lit("dst:"))).groupBy("key")
.agg(*[F.when(F.size(F.collect_set(i))>1,F.collect_list(F.concat("ref",i))).alias(i)
for i in cols]).dropna(subset = cols, how='all')
)
output.show()
--- ------------------ --------------------
|key| val11| val12|
--- ------------------ --------------------
|abc|[src:1.1, dst:2.1]|[src:john, dst:jack]|
--- ------------------ --------------------