Home > Mobile >  Why joining structure-identic dataframes gives different results?
Why joining structure-identic dataframes gives different results?

Time:09-27

Input df structures are identic in both runs, but outputs are different. Only the second run returns desired result (df6). I know I can use aliases for dataframes which would return desired result.

The question. What is the underlying Spark mechanics in creating df3? Spark reads df1.c1 == df2.c2 in the join's on clause, but it's evident that it does not pay attention to the dfs provided. What's under the hood there? How to anticipate such behaviour?

First run (incorrect df3 result):

data = [
    (1, 'bad', 'A'),
    (4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()

# --- ------ ---- --- 
#| ID|Status|  c1| c2|
# --- ------ ---- --- 
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
# --- ------ ---- --- 

df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()

# --- ------ ---- --- 
#| ID|Status|  c1| c2|
# --- ------ ---- --- 
#|  4|    ok|null|  A|
# --- ------ ---- --- 

df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()

# ---- ------ ---- ---- ---- ------ ---- ---- 
#|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
# ---- ------ ---- ---- ---- ------ ---- ---- 
#|   4|    ok|null|   A|null|  null|null|null|
#|null|  null|null|null|   1|   bad|   A|   A|
#|null|  null|null|null|   4|    ok|null|   A|
# ---- ------ ---- ---- ---- ------ ---- ---- 

Second run (correct df6 result):

data = [
    (1, 'bad', 'A', 'A'),
    (4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()

# --- ------ ---- --- 
#| ID|Status|  c1| c2|
# --- ------ ---- --- 
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
# --- ------ ---- --- 

df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()

# --- ------ ---- --- 
#| ID|Status|  c1| c2|
# --- ------ ---- --- 
#|  4|    ok|null|  A|
# --- ------ ---- --- 

df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()

# ---- ------ ---- ---- --- ------ ---- --- 
#|  ID|Status|  c1|  c2| ID|Status|  c1| c2|
# ---- ------ ---- ---- --- ------ ---- --- 
#|null|  null|null|null|  4|    ok|null|  A|
#|   4|    ok|null|   A|  1|   bad|   A|  A|
# ---- ------ ---- ---- --- ------ ---- --- 

I can see the physical plans are different in a way that different joins are used internally (BroadcastNestedLoopJoin and SortMergeJoin). But this by itself does not explain why results are different as they should still be same for different internal join types.

df3.explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
:   - *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
:      - *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
 - BroadcastExchange IdentityBroadcastMode, [id=#9250]
    - *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
       - *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]

df6.explain()

== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
:   - Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
:      - *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
:         - *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
 - *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
    - Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
       - *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]

CodePudding user response:

Spark for some reason doesn't distinguish your c1 and c2 columns correctly. This is the fix for df3 to have your expected result:

df3 = df2.alias('df2').join(df1.alias('df1'), (F.col('df1.c1') == F.col('df2.c2')), 'full')
df3.show()

# Output
#  ---- ------ ---- ---- --- ------ ---- --- 
# |  ID|Status|  c1|  c2| ID|Status|  c1| c2|
#  ---- ------ ---- ---- --- ------ ---- --- 
# |   4|    ok|null|   A|  1|   bad|   A|  A|
# |null|  null|null|null|  4|    ok|null|  A|
#  ---- ------ ---- ---- --- ------ ---- --- 

CodePudding user response:

As ptlc explain in his answer, Spark is not selecting the right columns for your join. Let's find out why.

What's under the hood ?

Let's start by getting physical plan on df1 and df2 using explain. Here is the physical plan for df1:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
 - *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

And here is the physical plan for df2:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
 - *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
    - *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

You can see in first line starting by (1) Project that the two dataframes df1 and df2 has the same columns names and ids: [ID#0L, Status#1, c1#2, A AS c2#6]. It is not surprising because df2 was created from df1, so you can see df2 as df1 with additional transformations. So we have the following references:

  • df1.c1 <=> df2.c1 <=> c1#2
  • df1.c2 <=> df2.c2 <=> A AS c2#6

And when you join df1 and df2, it means that you do a self-join. And all the following combinations of your condition:

  • df1.c1 = df2.c2
  • df1.c2 = df2.c1
  • df2.c1 = df1.c2
  • df2.c2 = df1.c1

will be translated as c1#2 = A AS c2#6, that gives you with simplication the join condition c1#2 = A:

When you perform a self-join with Spark, Spark will regenerate columns ids of right dataframe to avoid having same column ids in the final dataframe. So in your case it will rewrite column ids of df1. So column c1#2 will refer to column c1 of df2.

Now your condition doesn't contain any columns from df1, then Spark will chose to perform cartesian product as join strategy. As one of the two dataframe is small enough to be broadcasted, selected algorithm will be BroadcastNestedLoopJoin. It is what the physical plan of df3 shows:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
:   - *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
:      - *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
 - BroadcastExchange IdentityBroadcastMode, [id=#75]
    - *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
       - *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]

Note that the four new column ids of df1 are now [ID#46L, Status#47, c1#48, A AS c2#45].

And so when you execute this plan, as for the unique row of df2, value of c1 is null thus is different from A, join condition is always false. As you chose full outer join, you get the three rows (two from df1, one from df2) with null in columns coming from the other dataframe:

 ---- ------ ---- ---- ---- ------ ---- ---- 
|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
 ---- ------ ---- ---- ---- ------ ---- ---- 
|   4|    ok|null|   A|null|  null|null|null|
|null|  null|null|null|   1|   bad|   A|   A|
|null|  null|null|null|   4|    ok|null|   A|
 ---- ------ ---- ---- ---- ------ ---- ---- 

Why for the second run I have the desired output ?

For the second run, you create two independent dataframes. So if we look at physical plan of df4 and df5, you can see that the column ids are different. Here is the physical plan of df4

== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]

And here is the physical plan of df5:

== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
 - *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]

Your join condition is c1#100 = c2#126, c1#100 is c1 column from df4 and c2#126 is c2 column from df5. Each end of equality in join condition is from different dataframes, Spark can perform join as you expected.

Why this is not detected as Ambiguous Self Join ?

Spark usually checks that the columns you're using for join are not ambiguous. If you invert order of df2 and df1 when joining them as follow:

df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')

You get the following error:

pyspark.sql.utils.AnalysisException: Column c2#6 are ambiguous.

So why we don't have this error when executing df2.join(df1, ...) ?

You have your answer in file DetectAmbiguousSelfJoin in Spark's code:

// When self-join happens, the analyzer asks the right side plan to generate
// attributes with new exprIds. If a plan of a Dataset outputs an attribute which
// is referred by a column reference, and this attribute has different exprId than
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.

It means that when doing df2.join(df1, ...), we will only check columns used in join condition againt df1. As in our case we didn't perform any transformation on df1, contrary to df2 that was filtered, exprIds of df1 columns didn't change and thus no ambiguous columns error is raised.

How to anticipate such behaviour ?

You have to be very careful about wether your join is a self join. If you start from a dataframe df1, perform some transformation on it to get df2, and then join df1 and df2 you risk getting such behaviour. To mitigate that, you should always put the original dataframe as first dataframe when doing a join, so having df1.join(df2, ...) instead of df2.join(df1, ...). By doing so, you will get an Analysis Exception: Column x are ambiguous if Spark doesn't manage to select the right columns.

  • Related