I have two dataframes:
RegionValues:
----------- ---------- ----------------------
|marketplace|primary_id|values |
----------- ---------- ----------------------
|xyz |0000000001|[cat, dog, cow] |
|reg |PRT0000001|[hippo, dragon, moose]|
|asz |0000001333|[mouse, rhino, lion] |
----------- ---------- ----------------------
Marketplace:
---------- ----------- ----------
|primary_id|marketplace|parent_id |
---------- ----------- ----------
|0000000001|xyz |PRT0000001|
|0000000002|wrt |PRT0000001|
|PRT0000001|reg |PRT0000001|
|PRT00MISS0|asz |PRT00MISS0|
|000000000B|823 |PRT0000002|
---------- ----------- ----------
when I join the dataframes together I want to join them based on the primary_id
value but if the primary_id
field is not present in the RegionValues
dataframe, then I want to fallback to joining on parent_id
=== primary_id
. So my desired output would be:
---------- -------------- ----------- -------------------------------------
|primary_id|marketplace |parent_id |values |
---------- -------------- ----------- -------------------------------------
|0000000001|... |PRT0000001 |[cat, dog, cow] |
|0000000002|... |PRT0000001 |[hippo, dragon, moose] |
|PRT0000001|... |PRT0000001 |[hippo, dragon, moose] |
|PRT00MISS0| |PRT00MISS0 |null |
|0000001333| |0000001333 |[mouse, rhino, lion] |
|000000000B| |PRT0000002 |null |
---------- -------------- ----------- -------------------------------------
note that 0000000001
maintained its original values
but that 0000000002
took on its parent_id's values
since its not present in RegionValues
. Is it possible to accomplish this logic within a join statement? I am using Scala and Spark.
I have tried to use a join statement like this but this results in a null value for the 0000000002
values:
val parentIdJoinCondition = when(
(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))
val joinedDf = regionDf.join(
marketplaceDf,
parentIdJoinCondition,
"outer"
)
I think I could get my desired result by using 3 distinct joins but this seems unnecessary and harder to read.
CodePudding user response:
Shouldn't regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id"))
instead of regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
in your join statement help ?
CodePudding user response:
Creating custom conditions will result to Spark performing a cross-join, that is a very inefficient way to join. Moreover, there is no way for Spark to know that a column does not match before performing actual join, so your condition regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
will always return false.
So, as you correctly guessed, the best solution is to perform several joins. You can end with two joins. First join to determine if we should use primary_id
or parent_id
value for outer join, and the actual outer join. Then, you can merge primary_id
, marketplace
and parent_id
and drop useless columns
So the code would be:
import org.apache.spark.sql.functions.{coalesce, col, when}
val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
.withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
.drop("values")
.join(
regionDf
.withColumnRenamed("primary_id", "join_key")
.withColumnRenamed("marketplace", "region_marketplace"),
Seq("join_key"),
"outer"
)
.withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
.withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
.withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
.drop("join_key", "region_marketplace")
That gives you the following joinDf
dataframe:
---------- ----------- ---------- ----------------------
|primary_id|marketplace|parent_id |values |
---------- ----------- ---------- ----------------------
|0000000001|xyz |PRT0000001|[cat, dog, cow] |
|0000001333|asz |0000001333|[mouse, rhino, lion] |
|0000000002|wrt |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg |PRT0000001|[hippo, dragon, moose]|
|000000000B|823 |PRT0000002|null |
|PRT00MISS0|asz |PRT00MISS0|null |
---------- ----------- ---------- ----------------------