Home > Blockchain >  PYSPARK - join nullsafe on multiple columns
PYSPARK - join nullsafe on multiple columns

Time:06-30

Let's assume we are having two dataframes, which we want to compare for differences with a leftanti join:

data1 = [
  (1, 11, 20, None),
  (2, 12, 22, 31),
]

data2 = [
  (1, 11, 20, None),
  (2, 12, 22, 31),
]

schema = StructType([ \
    StructField("value_1",IntegerType(), True), \
    StructField("value_2",IntegerType(), True), \
    StructField("value_3",IntegerType(), True), \
    StructField("value_4",IntegerType(), True), \
])

df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)

How can I nullsafe join these dataframes by multiple (all) columns? The only solution I came up with is as followed:

df = df1.join(df2, \
    ((df1.value_1.eqNullSafe(df2.value_1)) &
    (df1.value_2.eqNullSafe(df2.value_2)) &
    (df1.value_3.eqNullSafe(df2.value_3)) &
    (df1.value_4.eqNullSafe(df2.value_4))),
    "leftanti" \
)

But unfortunately we have to deal now with a dynamic list of huge amounts of columns. How could we rewrite this join in a way, that we can provide a list of columns to be joined on.

THX & BR

CodePudding user response:

As far as I understand the problem statement, you want to create dynamic join condition based on a list of columns that one provides. We can do that using reduce() from functools module.

join_cols = ['value_1', 'value_2', 'value_3', 'value_4']

from functools import reduce

join_condition = reduce(lambda x, y: x & y, [df1[k].eqNullSafe(df2[k]) for k in join_cols])

print(join_condition)
# Column<'((((value_1 <=> value_1) AND (value_2 <=> value_2)) AND (value_3 <=> value_3)) AND (value_4 <=> value_4))'>

You can use the join_consition variable in the .join() directly.

df = df1.join(df2, join_condition, "leftanti")
  • Related