Home > Enterprise >  How to create a join condition using a loop?
How to create a join condition using a loop?

Time:07-12

I am creating a generic condition for joining 2 dataframes which have the same key and same structure as a code below. I would like to make it as a function for comparing 2 dataframes. First idea, I made it as string condition as it's easy for concatenate the condition with the loop. Finally, it seems like the join condition couldn't accept the string condition. Could somebody please guide me on this?

import pyspark.sql.functions as F

key = "col1 col2 col3"

def CompareData(df1,df2,key) :
  key_list = key.split(" ")
  key_con=""

  for col in key_list:
    condi = "(F.col(\""   col  "\") == F.col(\"" "x_" col "\"))"  # trying to generate generic condition
    key_con=key_con   "&"   condi

  key_condition=key_con.replace('&','',1)

  df1_tmp = df1.select([F.col(c).alias("x_" c) for c in df1.columns])

  df_compare = df2.join(df1_tmp, key_condition , "left")  # The problem was here. key_condition has error. If I copy the condition string below and place into join condition, it works fine.

  # key_condition = (F.col("col1") == F.col("x_col1")) & (F.col("col2") == F.col("x_col2")) & (F.col("col3") == F.col("x_col3")) 

CodePudding user response:

Try this:

  key_con = F.lit(True)
  for col in key_list:
    condi = (F.col(col) == F.col(f"x_{col}"))
    key_con = key_con & condi

In your attempt, your condition is of type string. But join's argument on only accepts string if it is a plain column name. You are trying to create a column expression and pass it to the on argument. Column expression is not the same thing as string, so you need a slightly different method to make a composite column expression.

  • Related