Home > other >  Convert Spark SQL to Scala using Window function partitioned by aggregate
Convert Spark SQL to Scala using Window function partitioned by aggregate

Time:04-13

I have the following Spark SQL query:

val subquery = 
        "( select garment_group_name , prod_name, "  
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "  
        "from articles a1 "  
        "group by garment_group_name, prod_name )"


val query = "SELECT garment_group_name, prod_name "  
            "FROM "   subquery  
            " WHERE seqnum = 1 "


val query3 = spark.sql(query)

I am trying to do that exact same thing however as a Data frame API. I wanted to just first concentrate on the subquery part and I did something like this

import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number

val windowSpec = Window.partitionBy("garment_group_name")

articlesDF.withColumn("row_number", row_number.over(windowSpec))
    .show()

However I get the following error

org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$33.applyOrElse(Analyzer.scala:2207)......... and so on.

I see that I need to include an orderBy clause but how can I do this if I am actually first counting from a group by on two columns and then comes an order by?

The warning gives the example: SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table, but I do not know how to do this as a data frame API and I don't see this online.

CodePudding user response:

The solution is to first perform count("prod_name") in a Window which is partitioned by both "garment_group_name" and "prod_name" which is then used in windowSpec.

Starting with some example data:

val df = List(
  ("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")

df.show(false)

gives:

 ------------------ --------- 
|garment_group_name|prod_name|
 ------------------ --------- 
|a                 |aa1      |
|a                 |aa2      |
|a                 |aa3      |
|b                 |bb       |
 ------------------ --------- 

and the two window functions we need:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec      = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)

We can then use them:

df
    // create the `count` column to be used by `windowSpec`
    .withColumn("count", count(col("prod_name")).over(countWindowSpec))
    .withColumn("seqnum", row_number.over(windowSpec))
    // take only the first row of each partition
    .filter(col("seqnum") === 1)
    // select only the rows we care about
    .select("garment_group_name", "prod_name")
    .show(false)

which gives:

 ------------------ --------- 
|garment_group_name|prod_name|
 ------------------ --------- 
|a                 |aa1      |
|b                 |bb       |
 ------------------ --------- 

Comparing this to your SQL implementation, using the same df:

df.createOrReplaceTempView("a1")

val subquery = 
        "( select garment_group_name , prod_name, "  
        "row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "  
        "from a1 "  
        "group by garment_group_name, prod_name )"

val query = "SELECT garment_group_name, prod_name "  
            "FROM "   subquery  
            " WHERE seqnum = 1 "

spark.sql(query).show(false)

we get the same result of:

 ------------------ --------- 
|garment_group_name|prod_name|
 ------------------ --------- 
|a                 |aa1      |
|b                 |bb       |
 ------------------ --------- 
  • Related