Home > Software engineering >  Spark window aggregate function not working intuitively with records ordering
Spark window aggregate function not working intuitively with records ordering

Time:01-30

I have following example which i am running on Spark 3.3

import pyspark.sql.functions as F
from pyspark.sql import Window

inputData = [
  ("1", 333),
  ("1", 222),
  ("1", 111),
  ("2", 334)
]
inputDf = spark.createDataFrame(inputData, schema=["id", "val"])

window = Window.partitionBy("id")
aggregatedDf = (
    inputDf.withColumn("min_val", F.min(F.col("val")).over(window))
    .withColumn("max_val", F.max(F.col("val")).over(window))
).show()

The output is as expected, i am getting correct min/max value for each window

 --- --- ------- ------- 
| id|val|min_val|max_val|
 --- --- ------- ------- 
|  1|333|    111|    333|
|  1|222|    111|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
 --- --- ------- ------- 

When i add orderBy to window, output is different:

window = Window.partitionBy("id").orderBy(F.col("val").desc())

 --- --- ------- ------- 
| id|val|min_val|max_val|
 --- --- ------- ------- 
|  1|333|    333|    333|
|  1|222|    222|    333|
|  1|111|    111|    333|
|  2|334|    334|    334|
 --- --- ------- ------- 

As you can see, with desc ordering max_value is fine, but min_value is changing from record to record

I tried to find more informations in docu or here on SO but no luck. For me its not intuitive at all.

My expectation was that Spark is going to scan all records in given partition and assign min/max value for each record within partition, which is true without ordering within window, but works differently when ordering is added

Does anyone know why its working like this?

CodePudding user response:

You need to add Out

To understand more about frame, consider reading https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html

https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86

  • Related