Home > Mobile >  Different result for same query in Spark 2.3 vs Spark 3.2
Different result for same query in Spark 2.3 vs Spark 3.2

Time:01-11

I am running a simple join in two versions of spark, 2.3 & 3.2. Code is as below

spark-shell --master yarn --deploy-mode client

val employeeDF = spark.createDataFrame(Seq((1,"Amit","2022","DAP"),(2,"Simran","2022","DAP"),(3,"Purna","2019","DAP"))).toDF("emp_id","name","year_joined","dept_name")
val deptDF = spark.createDataFrame(Seq(("DAP",1),("DAP",2),("DAP",3))).toDF("dept_name", "emp")
val joinedDF = employeeDF.join(deptDF, Seq("dept_name")).as("df1").select($"df1.*")

joinedDF.printSchema

This code generates two outputs. In Spark 2.4 it returns uniqe columns, while in Spark 3.2 it returns duplicate columns. Df.explain for both versions is as follows:

Apache Spark 2.4
________________

*(2) Project [dept_name#11, emp_id#8, name#9, year_joined#10, emp#21]
 - *(2) BroadcastHashJoin [dept_name#11], [dept_name#20], Inner, BuildRight
   :- *(2) Project [_1#0 AS emp_id#8, _2#1 AS name#9, _3#2 AS year_joined#10, _4#3 AS dept_name#11]
   :   - *(2) Filter isnotnull(_4#3)
   :      - LocalTableScan [_1#0, _2#1, _3#2, _4#3]
    - BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
       - *(1) Project [_1#16 AS dept_name#20, _2#17 AS emp#21]
          - *(1) Filter isnotnull(_1#16)
             - LocalTableScan [_1#16, _2#17]

Apache Spark 3.2
________________

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Project [dept_name#20, dept_name#11, emp_id#8, name#9, year_joined#10, emp#21]
    - BroadcastHashJoin [dept_name#11], [dept_name#20], Inner, BuildRight, false
      :- LocalTableScan [emp_id#8, name#9, year_joined#10, dept_name#11]
       - BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#16]
          - LocalTableScan [dept_name#20, emp#21]

Could not find a way to restore old behaviour in Spark 3.2 through config, changing code will be lot of effort as multiple jobs have similar issue. Any idea on configuration to fix this?

I tried diasbling AQE and set recursvive cte setting to LEGACY. Still see different results.

Since this DF return duplicate columns it fails in group by operation as well, giving Ambiguous column name.

CodePudding user response:

I can reproduce your case on Spark 3.2

I did a quick research and i found that it may be connected to this issue: https://issues.apache.org/jira/browse/SPARK-39376

Problem was added in 3.2 and fixed in 3.2.2 and 3.3.

I tested it on 3.3 and its working so i think that upgrade may be helpfull for you to

As it was a really small bug i dont think that there is any configuration which you can change

If you cant just update your Spark unfortunately you need to get rid of star expansion or drop column explicitly after select

In your case i removed .select($"df1.*") and just called .show on df1 and it was fine but i dont know full context of your code so its hard to tell what will be good in your case

  • Related