I have a dataframe like below,
Id1 | Id2 | Id3 | TaskId | TaskName | index |
---|---|---|---|---|---|
1 | 11 | bc123-234 | dfr3ws-45d | randomName1 | 1 |
1 | 11 | bc123-234 | er98d3-lkj | randomName2 | 2 |
1 | 11 | bc123-234 | hu77d9-mnb | randomName3 | 3 |
1 | 11 | bc123-234 | xc33d5-rew | deployhere4 | 4 |
1 | 11 | xre43-876 | dfr3ws-45d | randomName1 | 1 |
1 | 11 | xre43-876 | er98d3-lkj | deployhere2 | 2 |
1 | 11 | xre43-876 | hu77d9-mnb | randomName3 | 3 |
1 | 11 | xre43-876 | xc33d5-rew | randomName4 | 4 |
I partitioned the data using Id3 and Id2 and added the row_number.
I need to perform the below condition:
TaskId "hu77d9-mnb" should come before the task name which contains deploy in it. As the table suggests above the name will be random I need to read each name in the partition and see which name contains deploy in it.
if deploy taskName index is greater than taskID index then I mark the value as 1 otherwise 0.
I need to get final table like this:
Id1 | Id2 | Id3 | TaskId | TaskName | index | result |
---|---|---|---|---|---|---|
1 | 11 | bc123-234 | dfr3ws-45d | randomName1 | 1 | 1 |
1 | 11 | bc123-234 | er98d3-lkj | randomName2 | 2 | 1 |
1 | 11 | bc123-234 | hu77d9-mnb | randomName3 | 3 | 1 |
1 | 11 | bc123-234 | xc33d5-rew | deployhere4 | 4 | 1 |
1 | 11 | xre43-876 | dfr3ws-45d | randomName1 | 1 | 0 |
1 | 11 | xre43-876 | er98d3-lkj | deployhere2 | 2 | 0 |
1 | 11 | xre43-876 | hu77d9-mnb | randomName3 | 3 | 0 |
1 | 11 | xre43-876 | xc33d5-rew | randomName4 | 4 | 0 |
I am stuck at this place how can I pass the partition data to UDF (or other functions like UDAF) and perform this task. Any suggestion will be helpful. Thank you for your time.
CodePudding user response:
Index of "deploy" row and index of specific row ("hu77d9-mnb") can be assigned to each row with Window "first" function, and then just compared:
val df = Seq(
(1, 11, "bc123-234", "dfr3ws-45d", "randomName1", 1),
(1, 11, "bc123-234", "er98d3-lkj", "randomName2", 2),
(1, 11, "bc123-234", "hu77d9-mnb", "randomName3", 3),
(1, 11, "bc123-234", "xc33d5-rew", "deployhere4", 4),
(1, 11, "xre43-876", "dfr3ws-45d", "randomName1", 1),
(1, 11, "xre43-876", "er98d3-lkj", "deployhere2", 2),
(1, 11, "xre43-876", "hu77d9-mnb", "randomName3", 3),
(1, 11, "xre43-876", "xc33d5-rew", "randomName4", 4)
).toDF("Id1", "Id2", "Id3", "TaskID", "TaskName", "index")
val specificTaskId = "hu77d9-mnb"
val idsWindow = Window.partitionBy("Id1", "Id2", "Id3")
df.withColumn("deployIndex",
first(
when(instr($"TaskName", "deploy") > 0, $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("specificTaskIdIndex",
first(
when($"TaskID" === lit(specificTaskId), $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("result",
when($"specificTaskIdIndex" > $"deployIndex", 0).otherwise(1)
)
Output ("deployIndex" and "specificTaskIdIndex" columns have to be dropped):
--- --- --------- ---------- ----------- ----- ----------- ------------------- ------
|Id1|Id2|Id3 |TaskID |TaskName |index|deployIndex|specificTaskIdIndex|result|
--- --- --------- ---------- ----------- ----- ----------- ------------------- ------
|1 |11 |bc123-234|dfr3ws-45d|randomName1|1 |4 |3 |1 |
|1 |11 |bc123-234|er98d3-lkj|randomName2|2 |4 |3 |1 |
|1 |11 |bc123-234|hu77d9-mnb|randomName3|3 |4 |3 |1 |
|1 |11 |bc123-234|xc33d5-rew|deployhere4|4 |4 |3 |1 |
|1 |11 |xre43-876|dfr3ws-45d|randomName1|1 |2 |3 |0 |
|1 |11 |xre43-876|er98d3-lkj|deployhere2|2 |2 |3 |0 |
|1 |11 |xre43-876|hu77d9-mnb|randomName3|3 |2 |3 |0 |
|1 |11 |xre43-876|xc33d5-rew|randomName4|4 |2 |3 |0 |
--- --- --------- ---------- ----------- ----- ----------- ------------------- ------