I am running a simple join in two versions of spark, 2.3 & 3.2. Code is as below
spark-shell --master yarn --deploy-mode client
val employeeDF = spark.createDataFrame(Seq((1,"Amit","2022","DAP"),(2,"Simran","2022","DAP"),(3,"Purna","2019","DAP"))).toDF("emp_id","name","year_joined","dept_name")
val deptDF = spark.createDataFrame(Seq(("DAP",1),("DAP",2),("DAP",3))).toDF("dept_name", "emp")
val joinedDF = employeeDF.join(deptDF, Seq("dept_name")).as("df1").select($"df1.*")
joinedDF.printSchema
This code generates two outputs. In Spark 2.4 it returns uniqe columns, while in Spark 3.2 it returns duplicate columns. Df.explain for both versions is as follows:
Apache Spark 2.4
________________
*(2) Project [dept_name#11, emp_id#8, name#9, year_joined#10, emp#21]
- *(2) BroadcastHashJoin [dept_name#11], [dept_name#20], Inner, BuildRight
:- *(2) Project [_1#0 AS emp_id#8, _2#1 AS name#9, _3#2 AS year_joined#10, _4#3 AS dept_name#11]
: - *(2) Filter isnotnull(_4#3)
: - LocalTableScan [_1#0, _2#1, _3#2, _4#3]
- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
- *(1) Project [_1#16 AS dept_name#20, _2#17 AS emp#21]
- *(1) Filter isnotnull(_1#16)
- LocalTableScan [_1#16, _2#17]
Apache Spark 3.2
________________
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Project [dept_name#20, dept_name#11, emp_id#8, name#9, year_joined#10, emp#21]
- BroadcastHashJoin [dept_name#11], [dept_name#20], Inner, BuildRight, false
:- LocalTableScan [emp_id#8, name#9, year_joined#10, dept_name#11]
- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#16]
- LocalTableScan [dept_name#20, emp#21]
Could not find a way to restore old behaviour in Spark 3.2 through config, changing code will be lot of effort as multiple jobs have similar issue. Any idea on configuration to fix this?
I tried diasbling AQE and set recursvive cte setting to LEGACY. Still see different results.
Since this DF return duplicate columns it fails in group by operation as well, giving Ambiguous column name.
CodePudding user response:
I can reproduce your case on Spark 3.2
I did a quick research and i found that it may be connected to this issue: https://issues.apache.org/jira/browse/SPARK-39376
Problem was added in 3.2 and fixed in 3.2.2 and 3.3.
I tested it on 3.3 and its working so i think that upgrade may be helpfull for you to
As it was a really small bug i dont think that there is any configuration which you can change
If you cant just update your Spark unfortunately you need to get rid of star expansion or drop column explicitly after select
In your case i removed .select($"df1.*") and just called .show on df1 and it was fine but i dont know full context of your code so its hard to tell what will be good in your case