Home > Mobile >  How to add a field of one dataframe to nested field of another dataframe
How to add a field of one dataframe to nested field of another dataframe

Time:06-03

I'm working on spark, using scala, i have 2 DataFrames

Schema of DF 1 -

 root
 |-- employee: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- salary: long (nullable = true)
 |    |-- dept: string (nullable = true)
 |--....

Schema of DF 2-

 root
 |-- employee: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- salary: long (nullable = true)
 |    |-- dept: string (nullable = true)
 |.   |-- phone: string (nullable = false)

how can i add phone field to employee field on DF1,

Note: not all employees of DF1 are in DF2, so if employee not present in DF2, the phone field should be set with 000

CodePudding user response:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
    name: String,
    id: String,
    salary: Long,
    dept: String,
    phone: String
)

case class E1(employee: C1)
case class E2(employee: C2)

import spark.implicits._
val empl1DF =
  Seq(
    E1(C1("n1", "1", 1, "1")),
    E1(C1("n2", "2", 2, "2")),
    E1(C1("n5", "5", 5, "5"))
  ).toDF()
val empl2DF = Seq(
  E2(C2("n1", "1", 1, "1", "1111")),
  E2(C2("n2", "2", 2, "2", "22222")),
  E2(C2("n3", "3", 3, "3", "3333"))
).toDF()

empl1DF.printSchema()
//    root
//    |-- employee: struct (nullable = true)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = false)
//    |    |-- dept: string (nullable = true)

empl1DF.show(false)
//     ------------- 
//    |employee     |
//     ------------- 
//    |[n1, 1, 1, 1]|
//    |[n2, 2, 2, 2]|
//    |[n5, 5, 5, 5]|
//     ------------- 

empl2DF.printSchema()
//    root
//    |-- employee: struct (nullable = true)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = false)
//    |    |-- dept: string (nullable = true)
//    |    |-- phone: string (nullable = true)

empl2DF.show(false)
//     -------------------- 
//    |employee            |
//     -------------------- 
//    |[n1, 1, 1, 1, 1111] |
//    |[n2, 2, 2, 2, 22222]|
//    |[n3, 3, 3, 3, 3333] |
//     -------------------- 

val df1 = empl1DF
  .join(
    empl2DF,
    empl1DF.col("employee.id") === empl2DF.col("employee.id"),
    "left"
  )
  .select(
    empl1DF.col("employee.name"),
    empl1DF.col("employee.id"),
    empl1DF.col("employee.salary"),
    empl1DF.col("employee.dept"),
    empl2DF.col("employee.phone")
  )

val resDF = df1.na
  .fill("000", Seq("phone"))
  .select(
    struct(
      col("name"),
      col("id"),
      col("salary"),
      col("dept"),
      col("phone")
    ).as("employee")
  )

resDF.printSchema()
//    root
//    |-- employee: struct (nullable = false)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = true)
//    |    |-- dept: string (nullable = true)
//    |    |-- phone: string (nullable = true)

resDF.show(false)
//     -------------------- 
//    |employee            |
//     -------------------- 
//    |[n1, 1, 1, 1, 1111] |
//    |[n2, 2, 2, 2, 22222]|
//    |[n5, 5, 5, 5, 000]  |
//     -------------------- 
  • Related