I am new to Scala. I now have 3 tables.
A:
Marketplace | Level | Band |
---|---|---|
US | LEVEL_1 | |
CA | LEVEL_1 | BAND_1 |
B:
Marketplace | Level | Value |
---|---|---|
US | LEVEL_1 | 10 |
C:
Marketplace | Level | Band | Value |
---|---|---|---|
CA | LEVEL_1 | BAND_1 | 20 |
I would want to:
For rows with marketplace = US in table A -> join table B on Seq(Marketplace, Level) to get the Value;
For rows with marketplace = CA in table A -> join table C on Seq(Marketplace, Level, Band) to get the Value.
The output table will be like:
Marketplace | Level | Band | Value |
---|---|---|---|
US | LEVEL_1 | 10 | |
CA | LEVEL_1 | BAND_1 | 20 |
How should I write Scala code to achieve this? Thanks!
CodePudding user response:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{coalesce, col}
import spark.implicits._
val A = Seq(("US", "LEVEL_1", ""), ("CA", "LEVEL_1", "BAND_1"))
.toDF("Marketplace", "Level", "Band")
val B = Seq(("US", "LEVEL_1", 10)).toDF("Marketplace", "Level", "Value")
val C = Seq(("CA", "LEVEL_1", "BAND_1", 20)).toDF(
"Marketplace",
"Level",
"Band",
"Value"
)
val res = A
.join(B, A.col("Marketplace") === B.col("Marketplace"), "left")
.join(C, A.col("Marketplace") === C.col("Marketplace"), "left")
.select(
A.col("Marketplace").alias("Marketplace"),
A.col("Level").alias("Level"),
C.col("Band").alias("Band"),
coalesce(B.col("Value"), C.col("Value")).alias("Value")
)
res.show(false)
// ----------- ------- ------ -----
// |Marketplace|Level |Band |Value|
// ----------- ------- ------ -----
// |US |LEVEL_1|null |10 |
// |CA |LEVEL_1|BAND_1|20 |
// ----------- ------- ------ -----