Home > Mobile >  How to filter highest and lowest rank in certain group on PySpark?
How to filter highest and lowest rank in certain group on PySpark?

Time:12-09

Here's my Input

id      year  month date  hour  minute  rank
54807   2021     12   31     6      29  1.0
54807   2021     12   31     6      31  2.0
54807   2021     12   31     7      15  1.0
54807   2021     12   31     7      18  2.0
54807   2021     12   31     7      30  3.0

Here's the pandas code:

df.loc[
   df.groupby(["id", "hour"])["rank"] \
     .agg(["idxmin", "idxmax"]) \
     .stack()
].sort_index()

Here's my output

id      year  month date  hour  minute  rank
54807   2021     12   31     6      29  1.0
54807   2021     12   31     6      31  2.0
54807   2021     12   31     7      15  1.0
54807   2021     12   31     7      30  3.0

CodePudding user response:

Spark does not have indices for rows. An increasing and unique (but not guaranteed consecutive) ID can be created using monotonically_increasing_id.

Then group by ["id", "hour"] and aggregate min & max IDs as an array.

Finally, join by ["id", "hour"] and filter records if ID is in min-max-id-array.

df = spark.createDataFrame(data=[[54807,2021,12,31,6,29,1.0],[54807,2021,12,31,6,31,2.0],[54807,2021,12,31,7,15,1.0],[54807,2021,12,31,7,18,2.0],[54807,2021,12,31,7,30,3.0]], schema=["id","year","month","date","hour","minute","rank"])

import pyspark.sql.functions as F
df = df.withColumn("mono_inc_id", F.monotonically_increasing_id())

df_grp = df.groupBy("id", "hour") \
       .agg(
            F.array(
                    F.min("mono_inc_id"), 
                    F.max("mono_inc_id")
            ).alias("min_max_id")
       )
df_grp.show()
 ----- ---- ---------- 
|   id|hour|min_max_id|
 ----- ---- ---------- 
|54807|   7|    [2, 4]|
|54807|   6|    [0, 1]|
 ----- ---- ---------- 

df = df.join(df_grp, on=["id", "hour"]) \
       .filter(F.array_contains("min_max_id", F.col("mono_inc_id"))) \
       .drop("mono_inc_id", "min_max_id")

df.show()
 ----- ---- ---- ----- ---- ------ ---- 
|   id|hour|year|month|date|minute|rank|
 ----- ---- ---- ----- ---- ------ ---- 
|54807|   7|2021|   12|  31|    15| 1.0|
|54807|   7|2021|   12|  31|    30| 3.0|
|54807|   6|2021|   12|  31|    29| 1.0|
|54807|   6|2021|   12|  31|    31| 2.0|
 ----- ---- ---- ----- ---- ------ ---- 

CodePudding user response:

You can use a window function:

df = spark.createDataFrame(
    [
     ('54807','2021','12','31','6','29','1.0')
    ,('54807','2021','12','31','6','31','2.0')
    ,('54807','2021','12','31','7','15','1.0')
    ,('54807','2021','12','31','7','18','2.0')
    ,('54807','2021','12','31','7','30','3.0')
    ],
    ['id','year','month','date','hour','minute','rank']
)


from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy('id','hour')

df\
    .withColumn('max_rank', F.max('rank').over(w))\
    .withColumn('min_rank', F.min('rank').over(w))\
    .filter((F.col('rank')==F.col('max_rank'))|(F.col('rank')==F.col('min_rank')))\
    .drop('min_rank','max_rank')\
    .show()


#  ----- ---- ----- ---- ---- ------ ---- 
# |   id|year|month|date|hour|minute|rank|
#  ----- ---- ----- ---- ---- ------ ---- 
# |54807|2021|   12|  31|   6|    29| 1.0|
# |54807|2021|   12|  31|   6|    31| 2.0|
# |54807|2021|   12|  31|   7|    15| 1.0|
# |54807|2021|   12|  31|   7|    30| 3.0|
#  ----- ---- ----- ---- ---- ------ ---- 
  • Related