Suppose I have a dataframe:
val df = Seq(
(1,"A"),
(1,"B"),
(1,"C"),
(1,"D"),
(1,"E"),
(1,"F"),
(1,"G"),
(1,"H"),
(2,"I"),
(2,"J"),
(2,"J"),
(2,"J"),
(3,"K"),
).toDF("id", "code")
I need to rank it based on ids and with respect to some threshold. Example:
threshold = 3
id code rank
1 A 1
1 B 1
1 C 1 -- threshold has been reached
1 D 2
1 E 2
1 F 2 -- threshold has been reached
1 G 3
1 H 3
2 I 1
2 J 1
2 J 1 -- threshold has been reached
2 J 2
3 K 1
How can I do it?
I can create a simple rank:
df.withColumn("rank", dense_rank().over(Window.orderBy("id")))
But how to split ranked groups by threshold?
CodePudding user response:
A solution that does not require to move all data into one partition:
//get the largest number of equal ids
val maxGroupSize = df.groupBy("id").count().agg(max("count")).first().getLong(0)
val threshold = 3
var f = maxGroupSize
while( f % threshold>0) f=f 1
df.withColumn("tmp1", 'id* f)
.withColumn("tmp2", dense_rank().over(Window.partitionBy("id").orderBy("code"))-1)
.withColumn("tmp3", 'tmp1 'tmp2)
.withColumn("rank", ('tmp3 / threshold).cast("int"))
Result:
--- ---- ---- ---- ---- ----
| id|code|tmp1|tmp2|tmp3|rank|
--- ---- ---- ---- ---- ----
| 1| A| 9| 0| 9| 3|
| 1| B| 9| 1| 10| 3|
| 1| C| 9| 2| 11| 3|
| 1| D| 9| 3| 12| 4|
| 1| E| 9| 4| 13| 4|
| 1| F| 9| 5| 14| 4|
| 1| G| 9| 6| 15| 5|
| 1| H| 9| 7| 16| 5|
| 2| I| 18| 0| 18| 6|
| 2| J| 18| 1| 19| 6|
| 3| K| 27| 0| 27| 9|
--- ---- ---- ---- ---- ----
The downside of this approach is that the ranks are not consecutive. It would be possible to fix this with another window
df.withColumn("rank2", dense_rank().over(Window.orderBy("rank")))
but this would again move all data to a single executor.