Home > Net >  Apply a function every 60 rows in a pyspark dataframe
Apply a function every 60 rows in a pyspark dataframe

Time:12-02

My dataframe is called df, has 123729 rows, and looks like this:

 --- ------ ------ 
| HR|maxABP|Second|
 --- ------ ------ 
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|     8|

I need to aggregate every 60 rows, or seconds, to multiple values. For every minute, I want to know the minimal heartrate, the average heartrate, the maximal heartrate, and if maxABP was below 85 in any of those seconds. The desired output would look something like the table below, where Alarm is 1 if maxABP < 85, otherwise 0.

Min_HR Max_HR Avg_HR Alarm Minute
70 100 80 1 1
60 90 75 0 2

I'm wondering if it's possible to use mapreduce to aggregate every 60 rows to these single values. I know there is a lot wrong, but maybe something like this:

def max_HR(df, i):
   x = i
   y = i 60
   return reduce(lambda x, y: max(df[x:y]))

df_maxHR = map(lambda i: max_HR(i))

Where i should be parts of the df.

CodePudding user response:

I think the groupBy is enough to get the required result.

df.show()
 --- ------ ------ 
| HR|maxABP|Second|
 --- ------ ------ 
|110| 128.0|    10|
|110| 127.0|    20|
|111| 127.0|    30|
|111| 127.0|    40|
|111| 126.0|    50|
|111| 127.0|    60|
|109| 126.0|    70|
|111| 126.0|    80|
 --- ------ ------ 

df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
  .groupBy('Minute').agg( \
    f.round(f.min('HR'), 2).alias('Min_HR'), \
    f.round(f.max('HR'), 2).alias('Max_HR'), \
    f.round(f.avg('HR'), 2).alias('Avg_HR'), \
    f.max('maxABP').alias('maxABP')) \
  .withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
  .show()

 ------ ------ ------ ------ ------ ----- 
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
 ------ ------ ------ ------ ------ ----- 
|     1|   109|   111|110.33| 127.0|    0|
|     0|   110|   111| 110.6| 128.0|    0|
 ------ ------ ------ ------ ------ ----- 

CodePudding user response:

Exemple DF:

df = spark.createDataFrame(
  [
     (110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
    ,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
    ,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
  ], ['HR', 'maxABP', 'Second']
)

 --- ------ ------ 
| HR|maxABP|Second|
 --- ------ ------ 
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|  1001|
|114| 126.0|  1003|
|115|  83.0|  1064|
|116| 127.0|  1066|

Then using window functions:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w1 = (Window.partitionBy(F.col('Minute')))

df\
  .withColumn('Minute', F.round(F.col('Second')/60,0) 1)\
  .withColumn('Min_HR', F.min('HR').over(w1))\
  .withColumn('Max_HR', F.max('HR').over(w1))\
  .withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
  .withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
  .select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
  .dropDuplicates()\
  .withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
  .select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
  .orderBy('Minute')\
  .show()

 ------ ------ ------ ----- ------ 
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
 ------ ------ ------ ----- ------ 
|   109|   111| 110.0|    0|   1.0|
|   111|   114| 113.0|    0|  18.0|
|   115|   116| 116.0|    1|  19.0|
  • Related