Home > Software design >  PySpark join based on multiple parameterized conditions
PySpark join based on multiple parameterized conditions

Time:08-31

So I have two pyspark dataframes. Let's call them A and B. I want to perform a left join based on multiple conditions. Let's say the column names on which to join are the following:

cond= [A.columnA1==B.columnB1, A.columnA2==B.columnB2]
df=A.join(B,cond,'left')

Now, what if I don't know the column names in advance, and want to parameterize this? Imagine the user is allowed to pass two lists containing column names on which to join (which may be more than 2 columns per list, we don't know)

Imagine, we have the following list of columns on which we want to join, which takes input from users:

columnlistA=[]
columnlistB=[]

User will pass any number of column inputs for both these lists, but it will always be the same number for both of these two lists, such that the first element of columnlistA corresponds to the 1st element of columnlistB while joining and so on for corresponding elements. Then, how do I write the join so that I can make use of these columnlist parameters to be included in the join condition for these dataframes?

CodePudding user response:

You can do that by using aliases for your dataframes. Like that, you can access them when you refer to their column names as simple strings.

If I alias a dataframe as myDataFrame, I can refer to its columns in a string like that:

import pyspark.sql.functions as F
df = spark.createDataFrame(.....)
aliased_df = df.alias("myDataFrame")
F.col("myDataFrame.columnName")  # this is the same as df.columnName

So you can use that to construct a list with your columns dynamically specified:

A.alias("dfA").join(
  B.alias("dfB"),
  [F.col("dfA." col_a) == F.col("dfB." col_b) for col_a, col_b in zip(columnlistA, columnlistB)],
  'left'
)

CodePudding user response:

The following solution is based on two lists from which it will generate the join conditions. It assumes that the equality operator between column is always ==. You can control the binary operator between the conditions by specifying the op argument (only [or, and] are allowed).

from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import Column
from pyspark.sql.column import _bin_op

def generate_conditions(left_cols: list, right_cols: list, op: str = "or") -> Column:
  if(not left_cols or not right_cols):
    raise Exception("The lists should not be emtpy.")
  
  if(len(left_cols) != len(right_cols)):
    raise Exception("The lists should have same length.")
    
  if(op not in ["and", "or"]):
    raise Exception("Only [and, or] binary operators are allowed.")
    
  condition_list = reduce(lambda x,y: _bin_op(op)(x, y), [(col(l) == col(r)) for l, r in zip(left_cols, right_cols)])
  
  return condition_list

l = ["a1", "a2", "a3"]
r = ["b1", "b2", "b3"]

join_conditions = generate_conditions(l, r, "or")

print(join_conditions)
# Column<'(((a1 = b1) OR (a2 = b2)) OR (a3 = b3))'>

Now you can use it in your join as A.join(B, join_conditions, 'left')

  • Related