I'm working on spark, using scala, i have 2 DataFrames
Schema of DF 1 -
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|--....
Schema of DF 2-
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|. |-- phone: string (nullable = false)
how can i add phone
field to employee
field on DF1,
Note: not all employees of DF1 are in DF2, so if employee not present in DF2, the phone
field should be set with 000
CodePudding user response:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
name: String,
id: String,
salary: Long,
dept: String,
phone: String
)
case class E1(employee: C1)
case class E2(employee: C2)
import spark.implicits._
val empl1DF =
Seq(
E1(C1("n1", "1", 1, "1")),
E1(C1("n2", "2", 2, "2")),
E1(C1("n5", "5", 5, "5"))
).toDF()
val empl2DF = Seq(
E2(C2("n1", "1", 1, "1", "1111")),
E2(C2("n2", "2", 2, "2", "22222")),
E2(C2("n3", "3", 3, "3", "3333"))
).toDF()
empl1DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
empl1DF.show(false)
// -------------
// |employee |
// -------------
// |[n1, 1, 1, 1]|
// |[n2, 2, 2, 2]|
// |[n5, 5, 5, 5]|
// -------------
empl2DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
empl2DF.show(false)
// --------------------
// |employee |
// --------------------
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n3, 3, 3, 3, 3333] |
// --------------------
val df1 = empl1DF
.join(
empl2DF,
empl1DF.col("employee.id") === empl2DF.col("employee.id"),
"left"
)
.select(
empl1DF.col("employee.name"),
empl1DF.col("employee.id"),
empl1DF.col("employee.salary"),
empl1DF.col("employee.dept"),
empl2DF.col("employee.phone")
)
val resDF = df1.na
.fill("000", Seq("phone"))
.select(
struct(
col("name"),
col("id"),
col("salary"),
col("dept"),
col("phone")
).as("employee")
)
resDF.printSchema()
// root
// |-- employee: struct (nullable = false)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = true)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
resDF.show(false)
// --------------------
// |employee |
// --------------------
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n5, 5, 5, 5, 000] |
// --------------------