Home > Blockchain >  max aggregation on grouped spark dataframe returns wrong value
max aggregation on grouped spark dataframe returns wrong value

Time:11-02

I have a spark dataframe containing 2 columns(CPID and PluginDuration). I need to find maximum pluginDuration and average pluginDuration for each CPID in the dataframe.

Rows returned for a CPID AN04773 dataframe returned below rows:

df.filter('CPID = "AN04773"').show(10)

Result: 
 ------- -------------- 
|   CPID|PluginDuration|
 ------- -------------- 
|AN04773|   1.933333333|
|AN04773|   13.03444444|
|AN04773|        9.2875|
|AN04773|   20.50027778|
 ------- -------------- 

when I did groupBy on PID column of the dataframe to find max and avg plugin duration as below, I found the max value returned for some PIDs is not as expected. For example, for PID AN04773 (same PID which I used to show rows from original df). The max pluginDuration should be 20.50027778 but from the result from the below code, the max value is 9.2875 which is not right.

from pyspark.sql import functions as F
fdf = df.groupBy('CPID').agg(F.max('PluginDuration').alias('max_duration'),F.avg('PluginDuration').alias('avg_duration'))
fdf.filter('CPID = "AN04773"').show()

Result:
 ------- ------------ -------------- 
|   CPID|max_duration|  avg_duration|
 ------- ------------ -------------- 
|AN04773|      9.2875|11.18888888825|
 ------- ------------ -------------- 

want to know why it's not functioning as expected.

CodePudding user response:

The wrong calculation happens because PluginDuration is not defined as a numeric datatype but as a string column. All you have to do is to cast PluginDuration column to be of a numeric type (double, float, etc).

Here is your issue (reproduced in scala but works the same in PySpark):

val data = Seq(("AN04773", "1.933333333"), ("AN04773", "13.03444444"), ("AN04773", "9.2875"), ("AN04773", "20.50027778")).toDF("id", "value")

data.groupBy("id").agg(functions.max("value"), avg("value")).show

// output:
 ------- ---------- -------------- 
|     id|max(value)|    avg(value)|
 ------- ---------- -------------- 
|AN04773|    9.2875|11.18888888825|
 ------- ---------- -------------- 

but after casting the value column to Double datatype, we get correct calculation values:

data.withColumn("value",col("value").cast("double")).groupBy("id").agg(functions.max("value"), avg("value")).show

// output:
 ------- ----------- -------------- 
|     id| max(value)|    avg(value)|
 ------- ----------- -------------- 
|AN04773|20.50027778|11.18888888825|
 ------- ----------- -------------- 

CodePudding user response:

As there is no datatype defined in the columns so Scala treated it as string and as a string 9 is greater than 2 , so the maximum is 9.285

Note: If you convert the datatype in pyspark to string you will get the same result as you are getting in Scala.

  • Related