Home > Software engineering >  Scala Pass window partition dataset to UDF
Scala Pass window partition dataset to UDF

Time:09-21

I have a dataframe like below,

Id1 Id2 Id3 TaskId TaskName index
1 11 bc123-234 dfr3ws-45d randomName1 1
1 11 bc123-234 er98d3-lkj randomName2 2
1 11 bc123-234 hu77d9-mnb randomName3 3
1 11 bc123-234 xc33d5-rew deployhere4 4
1 11 xre43-876 dfr3ws-45d randomName1 1
1 11 xre43-876 er98d3-lkj deployhere2 2
1 11 xre43-876 hu77d9-mnb randomName3 3
1 11 xre43-876 xc33d5-rew randomName4 4

I partitioned the data using Id3 and Id2 and added the row_number.

I need to perform the below condition:

TaskId "hu77d9-mnb" should come before the task name which contains deploy in it. As the table suggests above the name will be random I need to read each name in the partition and see which name contains deploy in it.

if deploy taskName index is greater than taskID index then I mark the value as 1 otherwise 0.

I need to get final table like this:

Id1 Id2 Id3 TaskId TaskName index result
1 11 bc123-234 dfr3ws-45d randomName1 1 1
1 11 bc123-234 er98d3-lkj randomName2 2 1
1 11 bc123-234 hu77d9-mnb randomName3 3 1
1 11 bc123-234 xc33d5-rew deployhere4 4 1
1 11 xre43-876 dfr3ws-45d randomName1 1 0
1 11 xre43-876 er98d3-lkj deployhere2 2 0
1 11 xre43-876 hu77d9-mnb randomName3 3 0
1 11 xre43-876 xc33d5-rew randomName4 4 0

I am stuck at this place how can I pass the partition data to UDF (or other functions like UDAF) and perform this task. Any suggestion will be helpful. Thank you for your time.

CodePudding user response:

Index of "deploy" row and index of specific row ("hu77d9-mnb") can be assigned to each row with Window "first" function, and then just compared:

val df = Seq(
  (1, 11, "bc123-234", "dfr3ws-45d", "randomName1", 1),
  (1, 11, "bc123-234", "er98d3-lkj", "randomName2", 2),
  (1, 11, "bc123-234", "hu77d9-mnb", "randomName3", 3),
  (1, 11, "bc123-234", "xc33d5-rew", "deployhere4", 4),
  (1, 11, "xre43-876", "dfr3ws-45d", "randomName1", 1),
  (1, 11, "xre43-876", "er98d3-lkj", "deployhere2", 2),
  (1, 11, "xre43-876", "hu77d9-mnb", "randomName3", 3),
  (1, 11, "xre43-876", "xc33d5-rew", "randomName4", 4)
).toDF("Id1", "Id2", "Id3", "TaskID", "TaskName", "index")

val specificTaskId = "hu77d9-mnb"
val idsWindow = Window.partitionBy("Id1", "Id2", "Id3")

df.withColumn("deployIndex",
  first(
    when(instr($"TaskName", "deploy") > 0, $"index").otherwise(null),
    true)
    .over(idsWindow))

  .withColumn("specificTaskIdIndex",
    first(
      when($"TaskID" === lit(specificTaskId), $"index").otherwise(null),
      true)
      .over(idsWindow))

  .withColumn("result",
    when($"specificTaskIdIndex" > $"deployIndex", 0).otherwise(1)
  )

Output ("deployIndex" and "specificTaskIdIndex" columns have to be dropped):

 --- --- --------- ---------- ----------- ----- ----------- ------------------- ------ 
|Id1|Id2|Id3      |TaskID    |TaskName   |index|deployIndex|specificTaskIdIndex|result|
 --- --- --------- ---------- ----------- ----- ----------- ------------------- ------ 
|1  |11 |bc123-234|dfr3ws-45d|randomName1|1    |4          |3                  |1     |
|1  |11 |bc123-234|er98d3-lkj|randomName2|2    |4          |3                  |1     |
|1  |11 |bc123-234|hu77d9-mnb|randomName3|3    |4          |3                  |1     |
|1  |11 |bc123-234|xc33d5-rew|deployhere4|4    |4          |3                  |1     |
|1  |11 |xre43-876|dfr3ws-45d|randomName1|1    |2          |3                  |0     |
|1  |11 |xre43-876|er98d3-lkj|deployhere2|2    |2          |3                  |0     |
|1  |11 |xre43-876|hu77d9-mnb|randomName3|3    |2          |3                  |0     |
|1  |11 |xre43-876|xc33d5-rew|randomName4|4    |2          |3                  |0     |
 --- --- --------- ---------- ----------- ----- ----------- ------------------- ------ 
  • Related