I have below requirement in spark scala.
I have 2 dataframes:
DF1:
------------------------- ------------------- --------------- ------------------------------ --------------
|id |sid1 |pid1 |sid2 |pid2
------------------------- ------------------- --------------- ------------------------------ --------------
| 1 | 1111 | null| 2222 |null
------------------------- ------------------- --------------- ------------------------------ ---------------
DF2:
------------------------- ------------------- --------------- ------------------------------ --------------
|id |sid1 |pid1 |sid2 |pid2
------------------------- ------------------- --------------- ------------------------------ --------------
| 1 | null| 3333| null| 4444
------------------------- ------------------- --------------- ------------------------------ ---------------
I want to merge these two dataframe and in result dataset only non null values should come.
Expected output:
------------------------- ------------------- --------------- ------------------------------ --------------
|id |sid1 |pid1 |sid2 |pid2
------------------------- ------------------- --------------- ------------------------------ --------------
| 1 | 1111 | 3333| 2222 |4444
------------------------- ------------------- --------------- ------------------------------ ---------------
Both input dataframes(df1 and df2) will always have single record.
Also I do not want to hardcode column names in the logic because this needs to be implemented with dataframe having 200 columns.
CodePudding user response:
I got this Java Solution working:
List<String> cols = Arrays
.stream(leftDf.schema().names()).filter(s -> !s.equalsIgnoreCase("id"))
.map(col -> "coalesce(leftDf." col ", rightDf." col ") AS `" col "`").collect(Collectors.toList());
cols.add("Id");
leftDf.as("leftDf").join(rightDf.as("rightDf"), "id").selectExpr(JavaConverters.asScalaIteratorConverter(cols.iterator()).asScala().toSeq()).show();
I'll be adding Scala/Pyspark equivalent soon but the gist is join the two dataframes with aliases and call coalesce(colFromLeft, colFromRight) as col
i.e. take the first non-null value between two columns
CodePudding user response:
From what I know, I don't think there is something built-in in Spark that allows you to do that. However, there are a lot of workaround, I will show you one here.
First, we want to rename every column but id
since that is what we want to join on (so we can distinguish the non null values):
val columns = df1.columns.filterNot(col => col == "id")
columns.foreach(col => df1 = df1.withColumnRenamed(col, s"df1_$col"))
columns.foreach(col => df2 = df2.withColumnRenamed(col, s"df2_$col"))
Then, we want to use selectExrp
to pair same columns, therefore we construct a select statement:
var statements = Array[String]()
columns.foreach(col => {
statements = statements : s"filter(array(df1_$col, df2_$col), x -> x is not null)[0] as $col"
})
This statement, as a single string, becomes:
Array(filter(array(df1_sid1, df2_sid1), x -> x is not null)[0] as sid1...
Then, we do the actual work, joining
and then selecting
as below:
var finalDF = df1
.join(df2, Seq("id"), "left")
.selectExpr(Array("id") statements: _*)
Final output:
--- ---- ---- ---- ----
|id |sid1|pid1|sid2|pid2|
--- ---- ---- ---- ----
|1 |111 |3333|2222|4444|
--- ---- ---- ---- ----
Good luck!