My dataframe is called df, has 123729 rows, and looks like this:
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 8|
I need to aggregate every 60 rows, or seconds, to multiple values. For every minute, I want to know the minimal heartrate, the average heartrate, the maximal heartrate, and if maxABP was below 85 in any of those seconds. The desired output would look something like the table below, where Alarm is 1 if maxABP < 85, otherwise 0.
Min_HR | Max_HR | Avg_HR | Alarm | Minute |
---|---|---|---|---|
70 | 100 | 80 | 1 | 1 |
60 | 90 | 75 | 0 | 2 |
I'm wondering if it's possible to use mapreduce to aggregate every 60 rows to these single values. I know there is a lot wrong, but maybe something like this:
def max_HR(df, i):
x = i
y = i 60
return reduce(lambda x, y: max(df[x:y]))
df_maxHR = map(lambda i: max_HR(i))
Where i
should be parts of the df.
CodePudding user response:
I think the groupBy
is enough to get the required result.
df.show()
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 10|
|110| 127.0| 20|
|111| 127.0| 30|
|111| 127.0| 40|
|111| 126.0| 50|
|111| 127.0| 60|
|109| 126.0| 70|
|111| 126.0| 80|
--- ------ ------
df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) \
.groupBy('Minute').agg( \
f.round(f.min('HR'), 2).alias('Min_HR'), \
f.round(f.max('HR'), 2).alias('Max_HR'), \
f.round(f.avg('HR'), 2).alias('Avg_HR'), \
f.max('maxABP').alias('maxABP')) \
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) \
.show()
------ ------ ------ ------ ------ -----
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
------ ------ ------ ------ ------ -----
| 1| 109| 111|110.33| 127.0| 0|
| 0| 110| 111| 110.6| 128.0| 0|
------ ------ ------ ------ ------ -----
CodePudding user response:
Exemple DF:
df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
--- ------ ------
| HR|maxABP|Second|
--- ------ ------
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 1001|
|114| 126.0| 1003|
|115| 83.0| 1064|
|116| 127.0| 1066|
Then using window functions:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df\
.withColumn('Minute', F.round(F.col('Second')/60,0) 1)\
.withColumn('Min_HR', F.min('HR').over(w1))\
.withColumn('Max_HR', F.max('HR').over(w1))\
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))\
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))\
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')\
.dropDuplicates()\
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))\
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')\
.orderBy('Minute')\
.show()
------ ------ ------ ----- ------
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
------ ------ ------ ----- ------
| 109| 111| 110.0| 0| 1.0|
| 111| 114| 113.0| 0| 18.0|
| 115| 116| 116.0| 1| 19.0|