Here's my Input
id year month date hour minute rank
54807 2021 12 31 6 29 1.0
54807 2021 12 31 6 31 2.0
54807 2021 12 31 7 15 1.0
54807 2021 12 31 7 18 2.0
54807 2021 12 31 7 30 3.0
Here's the pandas code:
df.loc[
df.groupby(["id", "hour"])["rank"] \
.agg(["idxmin", "idxmax"]) \
.stack()
].sort_index()
Here's my output
id year month date hour minute rank
54807 2021 12 31 6 29 1.0
54807 2021 12 31 6 31 2.0
54807 2021 12 31 7 15 1.0
54807 2021 12 31 7 30 3.0
CodePudding user response:
Spark does not have indices for rows. An increasing and unique (but not guaranteed consecutive) ID can be created using monotonically_increasing_id.
Then group by ["id", "hour"] and aggregate min & max IDs as an array.
Finally, join by ["id", "hour"] and filter records if ID is in min-max-id-array.
df = spark.createDataFrame(data=[[54807,2021,12,31,6,29,1.0],[54807,2021,12,31,6,31,2.0],[54807,2021,12,31,7,15,1.0],[54807,2021,12,31,7,18,2.0],[54807,2021,12,31,7,30,3.0]], schema=["id","year","month","date","hour","minute","rank"])
import pyspark.sql.functions as F
df = df.withColumn("mono_inc_id", F.monotonically_increasing_id())
df_grp = df.groupBy("id", "hour") \
.agg(
F.array(
F.min("mono_inc_id"),
F.max("mono_inc_id")
).alias("min_max_id")
)
df_grp.show()
----- ---- ----------
| id|hour|min_max_id|
----- ---- ----------
|54807| 7| [2, 4]|
|54807| 6| [0, 1]|
----- ---- ----------
df = df.join(df_grp, on=["id", "hour"]) \
.filter(F.array_contains("min_max_id", F.col("mono_inc_id"))) \
.drop("mono_inc_id", "min_max_id")
df.show()
----- ---- ---- ----- ---- ------ ----
| id|hour|year|month|date|minute|rank|
----- ---- ---- ----- ---- ------ ----
|54807| 7|2021| 12| 31| 15| 1.0|
|54807| 7|2021| 12| 31| 30| 3.0|
|54807| 6|2021| 12| 31| 29| 1.0|
|54807| 6|2021| 12| 31| 31| 2.0|
----- ---- ---- ----- ---- ------ ----
CodePudding user response:
You can use a window function:
df = spark.createDataFrame(
[
('54807','2021','12','31','6','29','1.0')
,('54807','2021','12','31','6','31','2.0')
,('54807','2021','12','31','7','15','1.0')
,('54807','2021','12','31','7','18','2.0')
,('54807','2021','12','31','7','30','3.0')
],
['id','year','month','date','hour','minute','rank']
)
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('id','hour')
df\
.withColumn('max_rank', F.max('rank').over(w))\
.withColumn('min_rank', F.min('rank').over(w))\
.filter((F.col('rank')==F.col('max_rank'))|(F.col('rank')==F.col('min_rank')))\
.drop('min_rank','max_rank')\
.show()
# ----- ---- ----- ---- ---- ------ ----
# | id|year|month|date|hour|minute|rank|
# ----- ---- ----- ---- ---- ------ ----
# |54807|2021| 12| 31| 6| 29| 1.0|
# |54807|2021| 12| 31| 6| 31| 2.0|
# |54807|2021| 12| 31| 7| 15| 1.0|
# |54807|2021| 12| 31| 7| 30| 3.0|
# ----- ---- ----- ---- ---- ------ ----