Home > other >  Scala/Spark join on a second key if the first key doesn't exist in one of the dataframes
Scala/Spark join on a second key if the first key doesn't exist in one of the dataframes

Time:11-23

I have two dataframes:

 RegionValues:
 ----------- ---------- ---------------------- 
|marketplace|primary_id|values                |
 ----------- ---------- ---------------------- 
|xyz        |0000000001|[cat, dog, cow]       |
|reg        |PRT0000001|[hippo, dragon, moose]|
|asz        |0000001333|[mouse, rhino, lion]  |
 ----------- ---------- ---------------------- 

Marketplace:
 
 ---------- ----------- ---------- 
|primary_id|marketplace|parent_id |
 ---------- ----------- ---------- 
|0000000001|xyz        |PRT0000001|
|0000000002|wrt        |PRT0000001|
|PRT0000001|reg        |PRT0000001|
|PRT00MISS0|asz        |PRT00MISS0|
|000000000B|823        |PRT0000002|
 ---------- ----------- ---------- 

when I join the dataframes together I want to join them based on the primary_id value but if the primary_id field is not present in the RegionValues dataframe, then I want to fallback to joining on parent_id === primary_id. So my desired output would be:

 ---------- -------------- ----------- ------------------------------------- 
|primary_id|marketplace   |parent_id  |values                               |
 ---------- -------------- ----------- ------------------------------------- 
|0000000001|...           |PRT0000001 |[cat, dog, cow]                      |
|0000000002|...           |PRT0000001 |[hippo, dragon, moose]               |
|PRT0000001|...           |PRT0000001 |[hippo, dragon, moose]               |
|PRT00MISS0|              |PRT00MISS0 |null                                 |
|0000001333|              |0000001333 |[mouse, rhino, lion]                 |
|000000000B|              |PRT0000002 |null                                 |
 ---------- -------------- ----------- ------------------------------------- 

note that 0000000001 maintained its original values but that 0000000002 took on its parent_id's values since its not present in RegionValues. Is it possible to accomplish this logic within a join statement? I am using Scala and Spark.

I have tried to use a join statement like this but this results in a null value for the 0000000002 values:

val parentIdJoinCondition = when(
    (regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
    marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
  ).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))

val joinedDf = regionDf.join(
    marketplaceDf,
    parentIdJoinCondition,
    "outer"
)

I think I could get my desired result by using 3 distinct joins but this seems unnecessary and harder to read.

CodePudding user response:

Shouldn't regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id")) instead of regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull in your join statement help ?

CodePudding user response:

Creating custom conditions will result to Spark performing a cross-join, that is a very inefficient way to join. Moreover, there is no way for Spark to know that a column does not match before performing actual join, so your condition regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull will always return false.

So, as you correctly guessed, the best solution is to perform several joins. You can end with two joins. First join to determine if we should use primary_id or parent_id value for outer join, and the actual outer join. Then, you can merge primary_id, marketplace and parent_id and drop useless columns

So the code would be:

import org.apache.spark.sql.functions.{coalesce, col, when}

val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
  .withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
  .drop("values")
  .join(
    regionDf
      .withColumnRenamed("primary_id", "join_key")
      .withColumnRenamed("marketplace", "region_marketplace"),
    Seq("join_key"),
    "outer"
  )
  .withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
  .withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
  .withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
  .drop("join_key", "region_marketplace")

That gives you the following joinDf dataframe:

 ---------- ----------- ---------- ---------------------- 
|primary_id|marketplace|parent_id |values                |
 ---------- ----------- ---------- ---------------------- 
|0000000001|xyz        |PRT0000001|[cat, dog, cow]       |
|0000001333|asz        |0000001333|[mouse, rhino, lion]  |
|0000000002|wrt        |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg        |PRT0000001|[hippo, dragon, moose]|
|000000000B|823        |PRT0000002|null                  |
|PRT00MISS0|asz        |PRT00MISS0|null                  |
 ---------- ----------- ---------- ---------------------- 
  • Related