id | value | prevValue | indicator |
---|---|---|---|
1 | emp1 | null | 1 |
2 | emp2 | emp1 | 2 |
3 | emp1 | emp2 | 3 |
4 | emp1 | emp1 | 3 |
5 | emp3 | emp1 | 4 |
6 | emp3 | emp3 | 4 |
7 | emp1 | emp3 | 5 |
8 | emp2 | emp1 | 6 |
9 | emp2 | emp2 | 6 |
10 | emp2 | emp2 | 6 |
Lets say this whole set of rows is within one session window
I am have created the prevValue
column using lag function
lag("value", 1).over(sessionWindow).as("prevValue")
.
I wanted to create the indicator column like displayed in the table but haven't been successful yet.
There is a transition in value when value of prev row is not same as value of current row, whenever there is a transition the indicator row is incremented by 1 otherwise remains same as the previous row.
The main reasoning behind creating indicator column is to do a group by on indicators later.
Here what I was trying to do to create the column, but this doesn't work. Any help to resolve this would be appreciated.
.withColumn("indicator", when(col("prevValue").isNull, 1).otherwise(0))
.withColumn("indicator",
when(col("value") =!= col("prevValue"), lag("indicator", 1).over(sessionWindow) 1)
.otherwise(lag("indicator", 1, 1).over(sessionWindow)))
CodePudding user response:
val indWindow = Window.orderBy("id")
val sumWIndow = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("changed", when(col("value") =!= lag(col("value"), 1).over(indWindow), 1).otherwise(0))
.withColumn("group", sum("changed").over(sumWIndow))
produces:
--- ----- ------- -----
| id|value|changed|group|
--- ----- ------- -----
| 1| a| 0| 0|
| 2| a| 0| 0|
| 3| b| 1| 1|
| 3| c| 1| 2|
| 4| c| 0| 2|
| 5| c| 0| 2|
--- ----- ------- -----
note: This is a very performance inefficient solution as Spark will have to put all the rows in the same partition. Do you have any column to partition the data on?