Home > Enterprise >  Rank of value within a subset based on range of another column
Rank of value within a subset based on range of another column

Time:10-20

I have the following DataFrame (call it dfTest):

item position value
a 1 8
b 2 9
c 3 10
d 4 7
e 5 9

What I am trying to do is that, given a distance limit 1, find out the ranked index (sort by descending manner) of value in each row within a subset of rows whose distances (defined by position column) to this item are not greater than 1.

For example, the desired output (call it dfResult) should be:

item position value rank
a 1 8 2
b 2 9 2
c 3 10 1
d 4 7 3
e 5 9 1

Explanation:

  1. For a row. Since b is the only item whose distance to a is <= 1 (abs(2-1)=1), the sorted subset of value is [9,8] and a's value is 2nd in order, therefore the rank of a is 2.
  2. For b row. Here, a and c are the items with distances to b <= 1 , the sorted subset of value is [10,9,8] and b's value is 2nd in order, therefore the rank of b is 2.
  3. For c row. Here, b and d are the items with distances to c <= 1 , the sorted subset of value is [10,9,7] and c's value is 1st in order, therefore the rank of c is 1.
  4. ...

Any solution in Spark SQL, Scala or PySpark is appreciated!

CodePudding user response:

You may use a self-join to determine related items across the entire table (eg in the event you had an item f with a position of 2 also, a's rank may change) and the rank window function to achieve your desired rank.

Examples using spark-sql, pyspark and scala are included below:

Using spark-sql

SELECT
    d.item,
    d.position,
    d.value,
    d.rank
FROM (
    SELECT
        i1.*,
        i2.item as other_item,
        RANK() OVER (
            PARTITION BY i1.item
            ORDER BY i2.value DESC
        ) as rank
    FROM
        dfTest i1
    INNER JOIN
        dfTest i2 ON ABS(i1.position-i2.position)<=1
) d
WHERE item=other_item;

View working demo on db-fiddle

Using pyspark API

from pyspark.sql import functions as F
from pyspark.sql import Window

dfResult = (
    dfTest.alias("i1")
          .join(
              dfTest.alias("i2"),
              F.abs(
                  F.col("i1.position")-F.col("i2.position")
              ) <=1,
              "inner"
          )
          .withColumn(
              "rank",
              F.rank().over(
                  Window.partitionBy("i1.item").orderBy(F.col("i2.value").desc())
              )
          )
          .where(F.col("i1.item")==F.col("i2.item"))
          .select("i1.*","rank")
          
)

Using scala

val dfResult = 
    dfTest.alias("i1")
          .join(
              dfTest.alias("i2"),
              abs(
                  col("i1.position")-col("i2.position")
              ) <=1,
              "inner"
          )
          .withColumn(
              "rank",
              rank().over(
                  Window.partitionBy("i1.item").orderBy(col("i2.value").desc())
              )
          )
          .where(col("i1.item")==col("i2.item"))
          .select("i1.*","rank")
          

Output

dfResult.show(truncate=False)
 ---- -------- ----- ---- 
|item|position|value|rank|
 ---- -------- ----- ---- 
|   a|       1|    8|   2|
|   b|       2|    9|   2|
|   c|       3|   10|   1|
|   d|       4|    7|   3|
|   e|       5|    9|   1|
 ---- -------- ----- ---- 

Let me know if this works for you.

  • Related