I want to split my pyspark dataframe in groups with monotonically increasing trend and keep the groups with size greater than 10.
here i tried some part of code,
from pyspark.sql import functions as F, Window
df = df1.withColumn(
"FLAG_INCREASE",
F.when(
F.col("x")
> F.lag("x").over(Window.partitionBy("x1").orderBy("time")),
1,
).otherwise(0),
)
i don't know how to do groupby by consective one's in pyspark... if anyone have better solution for this
same thing in pandas we can do like this :
df=df1.groupby((df1['x'].diff() < 0).cumsum())
how to convert this code to pyspark ?
example dataframe:
x
0 1
1 2
2 2
3 2
4 3
5 3
6 4
7 5
8 4
9 3
10 2
11 1
12 2
13 3
14 4
15 5
16 5
17 6
expected output
group1:
x
0 1
1 2
2 2
3 2
4 3
5 3
6 4
7 5
group2:
x
0 1
1 2
2 3
3 4
4 5
5 5
6 6
CodePudding user response:
I'll map out all the steps (and keep all columns in the output) to replicate (df1['x'].diff() < 0).cumsum()
which is easy to calculate using a lag
.
However, it is important that your data has an ID column that has the order of the dataframe because unlike pandas, spark does not retain dataframe's sorting (due to its distributed nature). For this example, I've assumed that your data has an ID column named idx
, which is the index you printed in your example input.
# input data
data_sdf.show(5)
# --- ---
# |idx|val|
# --- ---
# | 0| 1|
# | 1| 2|
# | 2| 2|
# | 3| 2|
# | 4| 3|
# --- ---
# only showing top 5 rows
# calculating the group column
data_sdf. \
withColumn('diff_with_prevval',
func.col('val') - func.lag('val').over(wd.orderBy('idx'))
). \
withColumn('diff_lt_0',
func.coalesce((func.col('diff_with_prevval') < 0).cast('int'), func.lit(0))
). \
withColumn('diff_lt_0_cumsum',
func.sum('diff_lt_0').over(wd.orderBy('idx').rowsBetween(-sys.maxsize, 0))
). \
show()
# --- --- ----------------- --------- ----------------
# |idx|val|diff_with_prevval|diff_lt_0|diff_lt_0_cumsum|
# --- --- ----------------- --------- ----------------
# | 0| 1| null| 0| 0|
# | 1| 2| 1| 0| 0|
# | 2| 2| 0| 0| 0|
# | 3| 2| 0| 0| 0|
# | 4| 3| 1| 0| 0|
# | 5| 3| 0| 0| 0|
# | 6| 4| 1| 0| 0|
# | 7| 5| 1| 0| 0|
# | 8| 4| -1| 1| 1|
# | 9| 3| -1| 1| 2|
# | 10| 2| -1| 1| 3|
# | 11| 1| -1| 1| 4|
# | 12| 2| 1| 0| 4|
# | 13| 3| 1| 0| 4|
# | 14| 4| 1| 0| 4|
# | 15| 5| 1| 0| 4|
# | 16| 5| 0| 0| 4|
# | 17| 6| 1| 0| 4|
# --- --- ----------------- --------- ----------------
You can now use the diff_lt_0_cumsum
column in your groupBy()
for further use.