I have the dataframe like
name | time | statut |
---|---|---|
A | 1 | in |
A | 2 | out |
A | 3 | in |
A | 4 | out |
A | 5 | in |
B | 1 | in |
B | 4 | in |
B | 7 | out |
B | 18 | in |
I just want to get for each group the last time that I have statut
= "out" and the row after. Like this:
name | time | statut |
---|---|---|
A | 4 | out |
A | 5 | in |
B | 7 | out |
B | 18 | in |
CodePudding user response:
This can be done using a couple of window functions. However, the function which uses the window is not very simple.
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('A', 1, 'in'),
('A', 2, 'out'),
('A', 3, 'in'),
('A', 4, 'out'),
('A', 5, 'in'),
('B', 1, 'in'),
('B', 4, 'in'),
('B', 7, 'out'),
('B', 18, 'in'),
('B', 19, 'in'),
('C', 1, 'in')],
['name', 'time', 'statut']
)
w_max_out = W.partitionBy('name').orderBy(F.col('statut') != 'out', F.desc('time'))
w_lead = W.partitionBy('name').orderBy(F.desc('time'))
df = df.withColumn('_lead', F.lead('time').over(w_lead))
df = df.withColumn('_max_out', F.max(F.when(F.col('statut') == 'out', F.col('time'))).over(w_max_out))
df = df.filter('(_max_out = time) or (_max_out = _lead)').drop('_max_out', '_lead')
Result:
df.show()
# ---- ---- ------
# |name|time|statut|
# ---- ---- ------
# | A| 4| out|
# | A| 5| in|
# | B| 7| out|
# | B| 18| in|
# ---- ---- ------
Result before the last filter line:
# ---- ---- ------ ----- --------
# |name|time|statut|_lead|_max_out|
# ---- ---- ------ ----- --------
# | A| 4| out| 3| 4|
# | A| 2| out| 1| 4|
# | A| 5| in| 4| 4|
# | A| 3| in| 2| 4|
# | A| 1| in| null| 4|
# | B| 7| out| 4| 7|
# | B| 19| in| 18| 7|
# | B| 18| in| 7| 7|
# | B| 4| in| 1| 7|
# | B| 1| in| null| 7|
# | C| 1| in| null| null|
# ---- ---- ------ ----- --------