Home > Software design >  How to groupby by consective 1s in column in pyspark and keep groups with specific size
How to groupby by consective 1s in column in pyspark and keep groups with specific size

Time:08-12

I want to split my pyspark dataframe in groups with monotonically increasing trend and keep the groups with size greater than 10.

here i tried some part of code,

from pyspark.sql import functions as F, Window

df = df1.withColumn(
    "FLAG_INCREASE",
    F.when(
        F.col("x")
        > F.lag("x").over(Window.partitionBy("x1").orderBy("time")),
        1,
    ).otherwise(0),
)

i don't know how to do groupby by consective one's in pyspark... if anyone have better solution for this

same thing in pandas we can do like this :

df=df1.groupby((df1['x'].diff() < 0).cumsum())

how to convert this code to pyspark ?

example dataframe:

    x
0   1
1   2
2   2
3   2
4   3
5   3
6   4
7   5
8   4
9   3
10  2
11  1
12  2
13  3
14  4
15  5
16  5
17  6

expected output

group1:

   x
0  1
1  2
2  2
3  2
4  3
5  3
6  4
7  5

group2:

   x
0  1
1  2
2  3
3  4
4  5
5  5
6  6

CodePudding user response:

I'll map out all the steps (and keep all columns in the output) to replicate (df1['x'].diff() < 0).cumsum() which is easy to calculate using a lag.

However, it is important that your data has an ID column that has the order of the dataframe because unlike pandas, spark does not retain dataframe's sorting (due to its distributed nature). For this example, I've assumed that your data has an ID column named idx, which is the index you printed in your example input.

# input data
data_sdf.show(5)

#  --- --- 
# |idx|val|
#  --- --- 
# |  0|  1|
# |  1|  2|
# |  2|  2|
# |  3|  2|
# |  4|  3|
#  --- --- 
# only showing top 5 rows

# calculating the group column
data_sdf. \
    withColumn('diff_with_prevval', 
               func.col('val') - func.lag('val').over(wd.orderBy('idx'))
               ). \
    withColumn('diff_lt_0', 
               func.coalesce((func.col('diff_with_prevval') < 0).cast('int'), func.lit(0))
               ). \
    withColumn('diff_lt_0_cumsum', 
               func.sum('diff_lt_0').over(wd.orderBy('idx').rowsBetween(-sys.maxsize, 0))
               ). \
    show()

#  --- --- ----------------- --------- ---------------- 
# |idx|val|diff_with_prevval|diff_lt_0|diff_lt_0_cumsum|
#  --- --- ----------------- --------- ---------------- 
# |  0|  1|             null|        0|               0|
# |  1|  2|                1|        0|               0|
# |  2|  2|                0|        0|               0|
# |  3|  2|                0|        0|               0|
# |  4|  3|                1|        0|               0|
# |  5|  3|                0|        0|               0|
# |  6|  4|                1|        0|               0|
# |  7|  5|                1|        0|               0|
# |  8|  4|               -1|        1|               1|
# |  9|  3|               -1|        1|               2|
# | 10|  2|               -1|        1|               3|
# | 11|  1|               -1|        1|               4|
# | 12|  2|                1|        0|               4|
# | 13|  3|                1|        0|               4|
# | 14|  4|                1|        0|               4|
# | 15|  5|                1|        0|               4|
# | 16|  5|                0|        0|               4|
# | 17|  6|                1|        0|               4|
#  --- --- ----------------- --------- ---------------- 

You can now use the diff_lt_0_cumsum column in your groupBy() for further use.

  • Related