Home > Back-end >  How to add multiple column dynamically based on filter condition
How to add multiple column dynamically based on filter condition

Time:03-14

I am trying to create multiple columns dynamically based on filter condition after comparing two data frame with below code

source_df
 --- ----- ----- 
|key|val11|val12|
 --- ----- ----- 
|abc|  1.1| john|
|def|  3.0| dani|
 --- ----- ----- 

dest_df
 --- ----- ----- 
|key|val11|val12|
 --- ----- ----- 
|abc|  2.1| jack|
|def|  3.0| dani|

columns= source_df.columns[1:]
joined_df=source_df\
    .join(dest_df, 'key', 'full')
for column in columns:
     column_name="difference_in_" str(column)
     report = joined_df\
    .filter((source_df[column] != dest_df[column]))\
    .withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))


The output I expect is

#Expected
 --- ----------------- ------------------ 
|key| difference_in_val11| difference_in_val12 |
 --- ----------------- ------------------ 
|abc|[src:1.1,dst:2.1]|[src:john,dst:jack]|
 --- ----------------- ------------------- 

I get only one column result

#Actual
 --- ----------------- -
|key| difference_in_val12  |
 --- ----------------- -|
|abc|[src:john,dst:jack]|
 --- ----------------- -

How to generate multiple columns based on filter condition dynamically?

CodePudding user response:

Dataframes are immutable objects. Having said that, you need to create another dataframe using the one that got generated in the 1st iteration. Something like below -

from pyspark.sql import functions as F

columns= source_df.columns[1:]
joined_df=source_df\
    .join(dest_df, 'key', 'full')
for column in columns:
  if column != columns[-1]:
       column_name="difference_in_" str(column)
       report = joined_df\
                    .filter((source_df[column] != dest_df[column]))\
                    .withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))

  else:
    column_name="difference_in_" str(column)
    report1 = report.filter((source_df[column] != dest_df[column]))\
                    .withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
report1.show()
#report.show()

Output -

 --- ----- ----- ----- ----- ------------------- ------------------- 
|key|val11|val12|val11|val12|difference_in_val11|difference_in_val12|
 --- ----- ----- ----- ----- ------------------- ------------------- 
|abc|  1.1| john|  2.1| jack|  [src:1.1,dst:2.1]|[src:john,dst:jack]|
 --- ----- ----- ----- ----- ------------------- ------------------- 

CodePudding user response:

You could also do this with a union of both dataframes and then collect list only if collect_set size is greater than 1 , this can avoid joining the dataframes:

from pyspark.sql import functions as F
cols = source_df.drop("key").columns

output = (source_df.withColumn("ref",F.lit("src:"))
          .unionByName(dest_df.withColumn("ref",F.lit("dst:"))).groupBy("key")
.agg(*[F.when(F.size(F.collect_set(i))>1,F.collect_list(F.concat("ref",i))).alias(i)
       for i in cols]).dropna(subset = cols, how='all')
         )

output.show()

 --- ------------------ -------------------- 
|key|             val11|               val12|
 --- ------------------ -------------------- 
|abc|[src:1.1, dst:2.1]|[src:john, dst:jack]|
 --- ------------------ -------------------- 
  • Related