Home > Enterprise >  Spark dataframe join aggregating by ID
Spark dataframe join aggregating by ID

Time:01-08

I have problem in joining 2 dataframes grouped by ID

val df1 = Seq(
    (1, 1,100),
    (1, 3,20),
    (2, 5,5),
    (2, 2,10)).toDF("id", "index","value")

  val df2 = Seq(
    (1, 0),
    (2, 0),
    (3, 0),
    (4, 0),
    (5,0)).toDF("index", "value")

df1 joins with df2 by index column for every id

expected result

id index value
1 1 100
1 2 0
1 3 20
1 4 0
1 5 0
2 1 0
2 2 10
2 3 0
2 4 0
2 5 5

please help me on this

CodePudding user response:

First of all, I would replace your df2 table with this:

var df2 = Seq(
  (Array(1, 2), Array(1, 2, 3, 4, 5))
).toDF("id", "index")

This allows us to use explode and auto-generate a table which can be of help to us:

df2 = df2
  .withColumn("id", explode(col("id")))
  .withColumn("index", explode(col("index")))

and it gives:

 --- ----- 
|id |index|
 --- ----- 
|1  |1    |
|1  |2    |
|1  |3    |
|1  |4    |
|1  |5    |
|2  |1    |
|2  |2    |
|2  |3    |
|2  |4    |
|2  |5    |
 --- ----- 

Now, all we need to do, is join with your df1 as below:

df2 = df2
  .join(df1, Seq("id", "index"), "left")
  .withColumn("value", when(col("value").isNull, 0).otherwise(col("value")))

And we get this final output:

 --- ----- ----- 
|id |index|value|
 --- ----- ----- 
|1  |1    |100  |
|1  |2    |0    |
|1  |3    |20   |
|1  |4    |0    |
|1  |5    |0    |
|2  |1    |0    |
|2  |2    |10   |
|2  |3    |0    |
|2  |4    |0    |
|2  |5    |5    |
 --- ----- ----- 

which should be what you want. Good luck!

  • Related