I have This data frame, I would like to browse the array, when the condition is 1, I stop and take the maximum of value. For exemple, I take the max(16,8)= 8 ; max(6,10,9,8)=10.
Identifiant | Value | Condition |
---|---|---|
ID1 | 16 | 1 |
ID2 | 8 | 1 |
ID3 | 4 | 0 |
ID4 | 5 | 0 |
ID5 | 6 | 1 |
ID6 | 10 | 1 |
ID7 | 9 | 1 |
ID8 | 8 | 1 |
ID9 | 9 | 0 |
ID10 | 11 | 0 |
ID11 | 6 | 1 |
ID12 | 8 | 1 |
ID13 | 10 | 0 |
ID13 | 12 | 1 |
ID14 | 15 | 0 |
ID15 | 14 | 1 |
ID16 | 8 | 1 |
ID17 | 9 | 1 |
we obtain This table :
Identifiant | Value | |
---|---|---|
ID1 | 16 | max(16,8) |
ID6 | 10 | max(6,10,9,8) |
ID12 | 8 | max(6,8) |
ID13 | 12 | max(12) |
ID15 | 14 | max(14,8,9) |
CodePudding user response:
You can create seperate windows for this. First Create a window to calculate a helper column which assigns each group i.e. 1,1 gets 1, then 0,0 gets 2,then 1,1,1 gets 3 and so on. Once we have that we can use Condition column to filter out zeroes and the get max per partition of the helper column after getting a desending rank to get the max value per group:
from pyspark.sql import functions as F, Window as W
w = W.orderBy("Identifiant")
w1 = w.rangeBetween(W.unboundedPreceding,0)
w2 = W.partitionBy("Helper").orderBy(F.desc("Value"))
c = (F.col("Condition")!=F.lag("Condition").over(w)).cast("int")
out = (df.withColumn("Helper",c).fillna({"Helper":0})
.withColumn("Helper",F.sum("Helper").over(w1)).filter("Condition!=0")
.withColumn("Rnk",F.dense_rank().over(w2)).filter("Rnk==1")
.drop("Helper","Rnk","Condition")
)
out.show()
----------- -----
|Identifiant|Value|
----------- -----
| ID1| 16|
| ID6| 10|
| ID12| 8|
| ID13| 12|
| ID15| 14|
----------- -----