I know this question have been similarly asked here but nobody has raised up the case when one or both dataframes in the reduce are empty. Performing an inner join the final result in that case will be always empty
If I have:
dfList = List(
-------- ------------- -------- ------------- -------- -------- -------- --------
| ID | A | ID | B | | ID | C | | ID | D |
-------- ------------- -------- ------------- -------- -------- -------- --------
| 9574| F| | 9574| 005912| | 9574| 2016022| | 9574| VD|
| 9576| F| | 9576| 005912| | 9576| 2016022| | 9576| VD|
| 9578| F| | 9578| 005912| | 9578| 2016022| | 9578| VD|
| 9580| F| | 9580| 005912| | 9580| 2016022| | 9580| VD|
| 9582| F| | 9582| 005912| | 9582| 2016022| | 9582| VD|
-------- ------------- , -------- ------------- , -------- -------- , -------- --------
)
and I want to reduce this list into a single dataframe, I could easily do:
listDataFrames.reduce(
(df1, df2) =>
df1.join(df2, Seq("ID")).localCheckpoint(true)
)
Whatever the join is (inner, left, or right), if one of the first couple of dataframes is empty, the final result will be empty.
One way could be:
listDataFrames.filter(!_.rdd.isEmpty())
but it takes a lot of time, so it is not too good in terms of performance. Do you have some suggestion?
CodePudding user response:
If you are on Spark 3.1 you could actually use a unionByName & group by
df1.unionByName(df2, allowMissingColumns=True).grouby("ID","A","B","C","D")
In other version you can do something similar but I haven't tested the performance: (And there's a catch it only works on numeric fields, solikely you'd have to do some translation back and forth.)
df2.select(
df2.ID,
df2.B.alias("col1"),
f.lit("B").alias("table"))
.union(
df1.select(
df1.ID,
df1.A.alias("col1"),
f.lit("A")))
.groupBy("ID")
.pivot("table")
.max("col1")
.show()
CodePudding user response:
I guess the performance hit comes from trying to determine if any of the involved dataframes is empty (i.e. building the dataframes), not from lack of optimization on Catalyst part. Once Spark "knows" there is an empty df in that join, the whole plan collapses into 1 line:
scala> val emptydfschema = Seq("ID","E")
scala> val emptydf = Seq.empty[(Integer,String)].toDF(emptydfschema: _*)
scala> val dfList = List(df1,df2,df3,df4,emptydf)
dfList: List[org.apache.spark.sql.DataFrame] = List([ID: int, A: string], [ID: int, B: string], [ID: int, C: string], [ID: int, D: int], [ID: int, E: string])
scala> dfList.reduce((df1, df2) => df1.join(df2, Seq("ID"))).explain()
== Physical Plan ==
LocalTableScan <empty>, [ID#xx, A#xx, B#xx, C#xx, D#xx, E#xx]
scala> dfList.reduce((df1, df2) => df1.join(df2, Seq("ID"))).show(false)
--- --- --- --- --- ---
|ID |A |B |C |D |E |
--- --- --- --- --- ---
--- --- --- --- --- ---