I have the following to dataframes
DF1:
---------- ---------- ---------
| Place| lat| lon|
---------- ---------- ---------
| A| X_A| Y_A|
| B| X_B| Y_B|
| C| X_C| Y_C|
---------- ---------- ---------
DF2:
---------- ---------- ---------
| City| lat| lon|
---------- ---------- ---------
| D| X_D| Y_D|
| E| X_E| Y_E|
| F| X_F| Y_F|
| G| X_G| Y_G|
| H| X_H| Y_H|
| I| X_I| Y_I|
---------- ---------- ---------
What I want to obtain is the shortest euclidean distance from Place (from DF1) to City(from DF2)
So what I have to do is: first calculate the distance of Place A to the cities D until I and then decide the shortest distance based on the calculation.
So the pseudocode is something showed below containing a nested for loop:
for (places = ranging from A until C){
X1 = places.lat
Y1 = places.lon
for (city = ranging from D until I){
X2 = city.lat
Y2 = city.lon
list d = sqrt((X2-X1)^2 - (Y2-Y1)^2))
res[place] = min(d)}
where res[]
is actually a column in a dataframe containing the shortest distance.
So what I first thought is using a CrossJoin()
between the two dataframes, but then I don't know how I should continue after that step.
So can help anyone help me out?
CodePudding user response:
Once you've done your cross join, you can compute euclidean distance using hypot
function and store it into a distance
column with withColumn
dataset method and then get minimum value of this column by grouping by Place
column aggregating distance
column with min
aggregate function.
Here is the complete code:
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.hypot
df1.crossJoin(df2)
.withColumn("distance", hypot(df1.col("lat") - df2.col("lat"), df1.col("lon") - df2.col("lon")))
.groupBy("Place")
.agg(functions.min("distance").as("min_distance"))
You will get a dataframe with two columns, similar to the following one:
----- -----------------
|Place|min_distance |
----- -----------------
|B |2.68700576850888 |
|C |2.545584412271571|
|A |2.82842712474619 |
----- -----------------