I have a spark dataframe as below.
val df = Seq(("a",1,1400),("a",1,1250),("a",2,1200),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",2,500),("b",4,250),("b",4,200),("b",4,100),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
I am working in scala language to make use of this data frame and trying to get result as shown below.
val df = Seq(("a",1,1400),("a",4,1250),("a",4,1200),("a",4,1100),("b",2,2500),("b",2,1250),("b",4,250),("b",4,200),("b",4,100),("b",5,800)).
toDF("id","hierarchy","amount")
Rules: Grouped by id, if min(hierarchy)==1 then I take the row with the highest amount and then I go on to analyze hierarchy >= 4 and take 3 of each of them in descending order of the amount. On the other hand, if min(hierarchy)==2 then I take two rows with the highest amount and then I go on to analyze hierarchy >= 4 and take 3 of each of them in descending order of the amount. And so on for all the id's in the data.
Thanks for the suggestions..
CodePudding user response:
You may use window functions to generate the criteria which you will filter upon eg
val results = df.withColumn("minh",min("hierarchy").over(Window.partitionBy("id")))
.withColumn("rnk",rank().over(Window.partitionBy("id").orderBy(col("amount").desc())))
.withColumn(
"rn4",
when(col("hierarchy")>=4, row_number().over(
Window.partitionBy("id",when(col("hierarchy")>=4,1).otherwise(0)).orderBy(col("amount").desc())
) ).otherwise(5)
)
.filter("rnk <= minh or rn4 <=3")
.select("id","hierarchy","amount")
NB. More verbose filter .filter("(rnk <= minh or rn4 <=3) and (minh in (1,2))")
Above temporary columns generated by window functions to assist in the filtering criteria are
minh
: used to determine the minimum hierarchy for a group id and subsequently select the top minh number of columns from the group .rnk
used to determine the rows with the highest amount in each grouprn4
used to determine the rows with the highest amount in each group with hierarchy >=4