Hi EveryOne I'm new in Spark scala. I want to find the nearest values by partition using spark scala. My input is something like this:
first row for example: value 1 is between 2 and 7 in the value2 columns
-------- ---------- ----------
|id |value1 |value2 |
-------- ---------- ----------
|1 |3 |1 |
|1 |3 |2 |
|1 |3 |7 |
|2 |4 |2 |
|2 |4 |3 |
|2 |4 |8 |
|3 |5 |3 |
|3 |5 |6 |
|3 |5 |7 |
|3 |5 |8 |
My output should like this:
-------- ---------- ----------
|id |value1 |value2 |
-------- ---------- ----------
|1 |3 |2 |
|1 |3 |7 |
|2 |4 |3 |
|2 |4 |8 |
|3 |5 |3 |
|3 |5 |6 |
Can someone guide me how to resolve this please.
CodePudding user response:
Instead of providing a code answer as you appear to want to learn I've provided you pseudo code and references to allow you to find the answers for yourself.
- Group the elements (select id, value1) (aggregate on value2
with
collect_list
) so you can collect all the value2 into an array. - select (id, and (add(
concat
) value1 to thecollect_list
array)) Sorting the array . - find(
array_position
) value1 in the array. splice
the array. retrieving value before and value after the result of (array_position
)- If the array is less than 3 elements do error handling
- now the last value in the array and the first value in the array are your 'closest numbers'.
CodePudding user response:
You will need window functions for this.
val window = Window
.partitionBy("id", "value1")
.orderBy(asc("value2"))
val result = df
.withColumn("prev", lag("value2").over(window))
.withColumn("next", lead("value2").over(window))
.withColumn("dist_prev", col("value2").minus(col("prev")))
.withColumn("dist_next", col("prev").minus(col("value2")))
.withColumn("min", min(col("dist_prev")).over(window))
.filter(col("dist_prev") === col("min") || col("dist_next") === col("min"))
.drop("prev", "next", "dist_prev", "dist_next", "min")
I haven't tested it, so think about it more as an illustration of the concept than a working ready-to-use example.
Here is what's going on here:
- First, create a
window
that describes your grouping rule: we want the rows grouped by the first two columns, and sorted by the third one within each group. - Next, add
prev
andnext
columns to the dataframe that contain the value ofvalue2
column from previous and next row within the group respectively. (prev
will be null for the first row in the group, andnext
will be null for the last row – that is ok). - Add
dist_prev
anddist_next
to contain the distance betweenvalue2
andprev
andnext
value respectively. (Note thatdist_prev
for each row will have the same value asdist_next
for the previous row). - Find the minimum value for
dist_prev
within each group, and add it asmin
column (note, that the minimum value fordist_next
is the same by construction, so we only need one column here). - Filter the rows, selecting those that have the minimum value in either
dist_next
ordist_prev
. This finds the tightest pair unless there are multiple rows with the same distance from each other – this case was not accounted for in your question, so we don't know what kind of behavior you want in this case. This implementation will simply return all of these rows. - Finally, drop all extra columns that were added to the dataframe to return it to its original shape.