I have the following DataFrame (call it dfTest
):
item | position | value |
---|---|---|
a | 1 | 8 |
b | 2 | 9 |
c | 3 | 10 |
d | 4 | 7 |
e | 5 | 9 |
What I am trying to do is that, given a distance limit 1
, find out the ranked index (sort by descending manner) of value
in each row within a subset of rows whose distances (defined by position
column) to this item
are not greater than 1
.
For example, the desired output (call it dfResult
) should be:
item | position | value | rank |
---|---|---|---|
a | 1 | 8 | 2 |
b | 2 | 9 | 2 |
c | 3 | 10 | 1 |
d | 4 | 7 | 3 |
e | 5 | 9 | 1 |
Explanation:
- For
a
row. Sinceb
is the onlyitem
whose distance toa
is <= 1 (abs(2-1)=1
), the sorted subset ofvalue
is[9,8]
anda
'svalue
is 2nd in order, therefore therank
ofa
is 2. - For
b
row. Here,a
andc
are theitem
s with distances tob
<= 1 , the sorted subset ofvalue
is[10,9,8]
andb
'svalue
is 2nd in order, therefore therank
ofb
is 2. - For
c
row. Here,b
andd
are theitem
s with distances toc
<= 1 , the sorted subset ofvalue
is[10,9,7]
andc
'svalue
is 1st in order, therefore therank
ofc
is 1. - ...
Any solution in Spark SQL, Scala or PySpark is appreciated!
CodePudding user response:
You may use a self-join to determine related items across the entire table (eg in the event you had an item f
with a position of 2
also, a
's rank may change) and the rank
window function to achieve your desired rank.
Examples using spark-sql, pyspark and scala are included below:
Using spark-sql
SELECT
d.item,
d.position,
d.value,
d.rank
FROM (
SELECT
i1.*,
i2.item as other_item,
RANK() OVER (
PARTITION BY i1.item
ORDER BY i2.value DESC
) as rank
FROM
dfTest i1
INNER JOIN
dfTest i2 ON ABS(i1.position-i2.position)<=1
) d
WHERE item=other_item;
View working demo on db-fiddle
Using pyspark API
from pyspark.sql import functions as F
from pyspark.sql import Window
dfResult = (
dfTest.alias("i1")
.join(
dfTest.alias("i2"),
F.abs(
F.col("i1.position")-F.col("i2.position")
) <=1,
"inner"
)
.withColumn(
"rank",
F.rank().over(
Window.partitionBy("i1.item").orderBy(F.col("i2.value").desc())
)
)
.where(F.col("i1.item")==F.col("i2.item"))
.select("i1.*","rank")
)
Using scala
val dfResult =
dfTest.alias("i1")
.join(
dfTest.alias("i2"),
abs(
col("i1.position")-col("i2.position")
) <=1,
"inner"
)
.withColumn(
"rank",
rank().over(
Window.partitionBy("i1.item").orderBy(col("i2.value").desc())
)
)
.where(col("i1.item")==col("i2.item"))
.select("i1.*","rank")
Output
dfResult.show(truncate=False)
---- -------- ----- ----
|item|position|value|rank|
---- -------- ----- ----
| a| 1| 8| 2|
| b| 2| 9| 2|
| c| 3| 10| 1|
| d| 4| 7| 3|
| e| 5| 9| 1|
---- -------- ----- ----
Let me know if this works for you.