Home > Mobile >  Spark/Scala - Join resultset giving type mismatch error while performing withColumn operation; found
Spark/Scala - Join resultset giving type mismatch error while performing withColumn operation; found

Time:07-02

I have two dataframes that I am trying to join and based on the joined set trying to assign a flag column.

demandDF1

 -------- ----------- --------- 
|rgn_nm  |file_crt_dt|file_vrsn|
 -------- ----------- --------- 
|DAO     |2022-06-30 |1        |
|DAO     |2022-06-30 |1        |
|CCC     |2022-06-30 |1        |
|APCC    |2022-06-30 |1        |
|ODM     |2022-06-29 |3        |
|EMF     |2022-06-30 |1        |
|T2Region|2022-06-29 |4        |
|BCC     |2022-06-30 |1        |
|EMF     |2022-07-01 |1        |
 -------- ----------- --------- 

outputDistinctDF

 ------ ----------- --------- 
|region|file_crt_dt|file_vrsn|
 ------ ----------- --------- 
|DAO   |2022-06-30 |1        |
|CCC   |2022-06-29 |1        |
|APCC  |2022-06-30 |1        |
|ODM   |2022-06-29 |2        |
|EMF   |2022-06-30 |1        |
|BCC   |2022-06-30 |1        |
 ------ ----------- --------- 

I am trying to achieve something like this below.

 ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
 ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 
|DAO         |2022-06-30       |1              |DAO          |2022-06-30        |1               |0   |
|CCC         |2022-06-30       |1              |CCC          |2022-06-29        |1               |1   |
|T2Region    |2022-06-29       |4              |null         |null              |null            |1   |
|ODM         |2022-06-29       |3              |ODM          |2022-06-29        |2               |1   |
|APCC        |2022-06-30       |1              |APCC         |2022-06-30        |1               |0   |
|EMF         |2022-07-01       |1              |EMF          |2022-06-30        |1               |1   |
|EMF         |2022-06-30       |1              |EMF          |2022-06-30        |1               |0   |
|BCC         |2022-06-30       |1              |BCC          |2022-06-30        |1               |0   |
 ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 

The logic:

(input_file_crt_dt > output_file_crt_dt ) or
(input_file_crt_dt = output_file_crt_dt and input_file_vrsn > output_file_vrsn) or
(output_region is null)

then flag = 1 else 0

I have tried the following pseudo code but it ends up giving the error: enter image description here

Steps that I have followed:

val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()

val resultantDF = inputDistinctDF.join(outputDistinctDF,
  inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
  , "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
  inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
  inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
  outputDistinctDF.col("region") as "output_region",
  outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
  outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
  when((
    col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
    ), lit("1")).otherwise(lit("0")))

CodePudding user response:

I've carefully read your conditions and created a separate variable for it in case something went wrong. But everything went well.

val cond = (
  ($"input_file_crt_dt" > $"output_file_crt_dt") ||
  (($"input_file_crt_dt" === $"output_file_crt_dt") &&
   ($"input_file_vrsn" > $"output_file_vrsn")) ||
  $"output_region".isNull
)

val resultantDF = inputDistinctDF
  .join(
    outputDistinctDF,
    inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region"),
    "left_outer")
  .select(
    inputDistinctDF.col("rgn_nm") as "input_region",
    inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
    inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
    outputDistinctDF.col("region") as "output_region",
    outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
    outputDistinctDF.col("file_vrsn") as "output_file_vrsn")
  .withColumn("flag", when(cond, "1").otherwise("0"))
resultantDF.show()
//  ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 
// |input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
//  ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 
// |         BCC|       2022-06-30|              1|          BCC|        2022-06-30|               1|   0|
// |        APCC|       2022-06-30|              1|         APCC|        2022-06-30|               1|   0|
// |    T2Region|       2022-06-29|              4|         null|              null|            null|   1|
// |         EMF|       2022-06-30|              1|          EMF|        2022-06-30|               1|   0|
// |         ODM|       2022-06-29|              3|          ODM|        2022-06-29|               2|   1|
// |         CCC|       2022-06-30|              1|          CCC|        2022-06-29|               1|   1|
// |         EMF|       2022-07-01|              1|          EMF|        2022-06-30|               1|   1|
// |         DAO|       2022-06-30|              1|          DAO|        2022-06-30|               1|   0|
//  ------------ ----------------- --------------- ------------- ------------------ ---------------- ---- 
  • Related