I have two dataframes that I am trying to join and based on the joined set trying to assign a flag column.
demandDF1
-------- ----------- ---------
|rgn_nm |file_crt_dt|file_vrsn|
-------- ----------- ---------
|DAO |2022-06-30 |1 |
|DAO |2022-06-30 |1 |
|CCC |2022-06-30 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |3 |
|EMF |2022-06-30 |1 |
|T2Region|2022-06-29 |4 |
|BCC |2022-06-30 |1 |
|EMF |2022-07-01 |1 |
-------- ----------- ---------
outputDistinctDF
------ ----------- ---------
|region|file_crt_dt|file_vrsn|
------ ----------- ---------
|DAO |2022-06-30 |1 |
|CCC |2022-06-29 |1 |
|APCC |2022-06-30 |1 |
|ODM |2022-06-29 |2 |
|EMF |2022-06-30 |1 |
|BCC |2022-06-30 |1 |
------ ----------- ---------
I am trying to achieve something like this below.
------------ ----------------- --------------- ------------- ------------------ ---------------- ----
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
------------ ----------------- --------------- ------------- ------------------ ---------------- ----
|DAO |2022-06-30 |1 |DAO |2022-06-30 |1 |0 |
|CCC |2022-06-30 |1 |CCC |2022-06-29 |1 |1 |
|T2Region |2022-06-29 |4 |null |null |null |1 |
|ODM |2022-06-29 |3 |ODM |2022-06-29 |2 |1 |
|APCC |2022-06-30 |1 |APCC |2022-06-30 |1 |0 |
|EMF |2022-07-01 |1 |EMF |2022-06-30 |1 |1 |
|EMF |2022-06-30 |1 |EMF |2022-06-30 |1 |0 |
|BCC |2022-06-30 |1 |BCC |2022-06-30 |1 |0 |
------------ ----------------- --------------- ------------- ------------------ ---------------- ----
The logic:
(input_file_crt_dt > output_file_crt_dt ) or
(input_file_crt_dt = output_file_crt_dt and input_file_vrsn > output_file_vrsn) or
(output_region is null)
then flag = 1 else 0
I have tried the following pseudo code but it ends up giving the error:
Steps that I have followed:
val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))
val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()
val resultantDF = inputDistinctDF.join(outputDistinctDF,
inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
, "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
outputDistinctDF.col("region") as "output_region",
outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
when((
col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
), lit("1")).otherwise(lit("0")))
CodePudding user response:
I've carefully read your conditions and created a separate variable for it in case something went wrong. But everything went well.
val cond = (
($"input_file_crt_dt" > $"output_file_crt_dt") ||
(($"input_file_crt_dt" === $"output_file_crt_dt") &&
($"input_file_vrsn" > $"output_file_vrsn")) ||
$"output_region".isNull
)
val resultantDF = inputDistinctDF
.join(
outputDistinctDF,
inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region"),
"left_outer")
.select(
inputDistinctDF.col("rgn_nm") as "input_region",
inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
outputDistinctDF.col("region") as "output_region",
outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
outputDistinctDF.col("file_vrsn") as "output_file_vrsn")
.withColumn("flag", when(cond, "1").otherwise("0"))
resultantDF.show()
// ------------ ----------------- --------------- ------------- ------------------ ---------------- ----
// |input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
// ------------ ----------------- --------------- ------------- ------------------ ---------------- ----
// | BCC| 2022-06-30| 1| BCC| 2022-06-30| 1| 0|
// | APCC| 2022-06-30| 1| APCC| 2022-06-30| 1| 0|
// | T2Region| 2022-06-29| 4| null| null| null| 1|
// | EMF| 2022-06-30| 1| EMF| 2022-06-30| 1| 0|
// | ODM| 2022-06-29| 3| ODM| 2022-06-29| 2| 1|
// | CCC| 2022-06-30| 1| CCC| 2022-06-29| 1| 1|
// | EMF| 2022-07-01| 1| EMF| 2022-06-30| 1| 1|
// | DAO| 2022-06-30| 1| DAO| 2022-06-30| 1| 0|
// ------------ ----------------- --------------- ------------- ------------------ ---------------- ----