Home > Software design >  Create Pandas data frame with statistics from PySpark data frame
Create Pandas data frame with statistics from PySpark data frame

Time:06-26

I have big PySpark data frame that looks like this:

from pyspark.sql.functions import col, to_timestamp

data = [('2010-09-12 0', 'x1', 13), 
        ('2010-09-12 0', 'x2', 12), 
        ('2010-09-12 2', 'x3', 23), 
        ('2010-09-12 4', 'x1', 22), 
        ('2010-09-12 4', 'x2', 32), 
        ('2010-09-12 4', 'x3', 7), 
        ('2010-09-12 6', 'x3', 24),
        ('2010-09-12 16', 'x3', 34),]

columns = ['timestamp', 'category', 'value']
df =spark.createDataFrame(data=data, schema=columns)
df = df.withColumn('ts', to_timestamp(col('timestamp'), 'yyyy-MM-dd H')).drop(col('timestamp'))
df.show()

 -------- ----- ------------------- 
|category|value|                 ts|
 -------- ----- ------------------- 
|      x1|   13|2010-09-12 00:00:00|
|      x2|   12|2010-09-12 00:00:00|
|      x3|   23|2010-09-12 02:00:00|
|      x1|   22|2010-09-12 04:00:00|
|      x2|   32|2010-09-12 04:00:00|
|      x3|    7|2010-09-12 04:00:00|
|      x3|   24|2010-09-12 06:00:00|
|      x3|   34|2010-09-12 16:00:00|
 -------- ----- ------------------- 

The timestamp in column ts is increasing at every exact 2-hour interval(for eg. 0, 2, ..., 22)

I want to extract the average, min, max, median of column value by the ts timestamp, and put these statistics into a pandas data frame as following:

import pandas as pd
import datetime

start_ts = datetime.datetime(year=2010, month=2, day=1, hour=0)
end_ts = datetime.datetime(year=2022, month=6, day=1, hour=22)
ts                      average   min    max   median 
...
2010-09-12 00:00:00     12.5      12     13    12.5
2010-09-12 02:00:00     23        23     23    23
2010-09-12 04:00:00     20.3      7      32    22
2010-09-12 06:00:00     24        24     24    24
2010-09-12 16:00:00     34        34     34    34
...

What would be an economical way to do this, minimizing the number of iterations over the pyspark dataframe?

CodePudding user response:

Aggregate then convert the result into pandas:

from pyspark.sql import functions as F

df1 = df.groupby("ts").agg(
    F.avg("value").alias("average"),
    F.min("value").alias("min"),
    F.max("value").alias("max"),
    F.percentile_approx("value", 0.5).alias("median")
)

result = df1.toPandas()

#                    ts    average  min  max  median
# 0 2010-09-12 00:00:00  12.500000   12   13      12
# 1 2010-09-12 02:00:00  23.000000   23   23      23
# 2 2010-09-12 04:00:00  20.333333    7   32      22
# 3 2010-09-12 06:00:00  24.000000   24   24      24
# 4 2010-09-12 16:00:00  34.000000   34   34      34

CodePudding user response:

The following should calculate the accurate median, but you shouldn't be using accurate median for very big groups of data.

Also, you can filter data without datetime module.

from pyspark.sql import functions as F
df = (df
    .filter(F.col('ts').between('2010-02-01', '2022-06-01'))
    .groupBy('ts').agg(
        F.round(F.mean('value'), 1).alias('average'),
        F.min('value').alias('min'),
        F.max('value').alias('max'),
        F.expr('percentile(value, .5)').alias('median'),
    )
)
pdf = df.toPandas()
print(pdf)
#                    ts  average  min  max  median
# 0 2010-09-12 02:00:00     23.0   23   23    23.0
# 1 2010-09-12 00:00:00     12.5   12   13    12.5
# 2 2010-09-12 06:00:00     24.0   24   24    24.0
# 3 2010-09-12 16:00:00     34.0   34   34    34.0
# 4 2010-09-12 04:00:00     20.3    7   32    22.0
  • Related