I have big PySpark data frame that looks like this:
from pyspark.sql.functions import col, to_timestamp
data = [('2010-09-12 0', 'x1', 13),
('2010-09-12 0', 'x2', 12),
('2010-09-12 2', 'x3', 23),
('2010-09-12 4', 'x1', 22),
('2010-09-12 4', 'x2', 32),
('2010-09-12 4', 'x3', 7),
('2010-09-12 6', 'x3', 24),
('2010-09-12 16', 'x3', 34),]
columns = ['timestamp', 'category', 'value']
df =spark.createDataFrame(data=data, schema=columns)
df = df.withColumn('ts', to_timestamp(col('timestamp'), 'yyyy-MM-dd H')).drop(col('timestamp'))
df.show()
-------- ----- -------------------
|category|value| ts|
-------- ----- -------------------
| x1| 13|2010-09-12 00:00:00|
| x2| 12|2010-09-12 00:00:00|
| x3| 23|2010-09-12 02:00:00|
| x1| 22|2010-09-12 04:00:00|
| x2| 32|2010-09-12 04:00:00|
| x3| 7|2010-09-12 04:00:00|
| x3| 24|2010-09-12 06:00:00|
| x3| 34|2010-09-12 16:00:00|
-------- ----- -------------------
The timestamp in column ts
is increasing at every exact 2-hour interval(for eg. 0
, 2
, ..., 22
)
I want to extract the average
, min
, max
, median
of column value
by the ts
timestamp, and put these statistics into a pandas
data frame as following:
import pandas as pd
import datetime
start_ts = datetime.datetime(year=2010, month=2, day=1, hour=0)
end_ts = datetime.datetime(year=2022, month=6, day=1, hour=22)
ts average min max median
...
2010-09-12 00:00:00 12.5 12 13 12.5
2010-09-12 02:00:00 23 23 23 23
2010-09-12 04:00:00 20.3 7 32 22
2010-09-12 06:00:00 24 24 24 24
2010-09-12 16:00:00 34 34 34 34
...
What would be an economical way to do this, minimizing the number of iterations over the pyspark
dataframe?
CodePudding user response:
Aggregate then convert the result into pandas:
from pyspark.sql import functions as F
df1 = df.groupby("ts").agg(
F.avg("value").alias("average"),
F.min("value").alias("min"),
F.max("value").alias("max"),
F.percentile_approx("value", 0.5).alias("median")
)
result = df1.toPandas()
# ts average min max median
# 0 2010-09-12 00:00:00 12.500000 12 13 12
# 1 2010-09-12 02:00:00 23.000000 23 23 23
# 2 2010-09-12 04:00:00 20.333333 7 32 22
# 3 2010-09-12 06:00:00 24.000000 24 24 24
# 4 2010-09-12 16:00:00 34.000000 34 34 34
CodePudding user response:
The following should calculate the accurate median, but you shouldn't be using accurate median for very big groups of data.
Also, you can filter data without datetime
module.
from pyspark.sql import functions as F
df = (df
.filter(F.col('ts').between('2010-02-01', '2022-06-01'))
.groupBy('ts').agg(
F.round(F.mean('value'), 1).alias('average'),
F.min('value').alias('min'),
F.max('value').alias('max'),
F.expr('percentile(value, .5)').alias('median'),
)
)
pdf = df.toPandas()
print(pdf)
# ts average min max median
# 0 2010-09-12 02:00:00 23.0 23 23 23.0
# 1 2010-09-12 00:00:00 12.5 12 13 12.5
# 2 2010-09-12 06:00:00 24.0 24 24 24.0
# 3 2010-09-12 16:00:00 34.0 34 34 34.0
# 4 2010-09-12 04:00:00 20.3 7 32 22.0