I have the following Spark SQL query:
val subquery =
"( select garment_group_name , prod_name, "
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "
"from articles a1 "
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name "
"FROM " subquery
" WHERE seqnum = 1 "
val query3 = spark.sql(query)
I am trying to do that exact same thing however as a Data frame API. I wanted to just first concentrate on the subquery part and I did something like this
import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number
val windowSpec = Window.partitionBy("garment_group_name")
articlesDF.withColumn("row_number", row_number.over(windowSpec))
.show()
However I get the following error
org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$33.applyOrElse(Analyzer.scala:2207)......... and so on.
I see that I need to include an orderBy
clause but how can I do this if I am actually first counting from a group by on two columns and then comes an order by?
The warning gives the example: SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table
, but I do not know how to do this as a data frame API and I don't see this online.
CodePudding user response:
The solution is to first perform count("prod_name")
in a Window which is partitioned by both "garment_group_name"
and "prod_name"
which is then used in windowSpec
.
Starting with some example data:
val df = List(
("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")
df.show(false)
gives:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|a |aa2 |
|a |aa3 |
|b |bb |
------------------ ---------
and the two window functions we need:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)
We can then use them:
df
// create the `count` column to be used by `windowSpec`
.withColumn("count", count(col("prod_name")).over(countWindowSpec))
.withColumn("seqnum", row_number.over(windowSpec))
// take only the first row of each partition
.filter(col("seqnum") === 1)
// select only the rows we care about
.select("garment_group_name", "prod_name")
.show(false)
which gives:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|b |bb |
------------------ ---------
Comparing this to your SQL implementation, using the same df
:
df.createOrReplaceTempView("a1")
val subquery =
"( select garment_group_name , prod_name, "
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "
"from a1 "
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name "
"FROM " subquery
" WHERE seqnum = 1 "
spark.sql(query).show(false)
we get the same result of:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|b |bb |
------------------ ---------