Home > Net >  Doing calculations between columns of different dataframe where something like a for loop is include
Doing calculations between columns of different dataframe where something like a for loop is include

Time:10-24

I have the following to dataframes

DF1:
 ---------- ---------- --------- 
|     Place|       lat|      lon|
 ---------- ---------- --------- 
|         A|       X_A|      Y_A|
|         B|       X_B|      Y_B|
|         C|       X_C|      Y_C|
 ---------- ---------- --------- 

DF2:
 ---------- ---------- --------- 
|      City|       lat|      lon|
 ---------- ---------- --------- 
|         D|       X_D|      Y_D|
|         E|       X_E|      Y_E|
|         F|       X_F|      Y_F|
|         G|       X_G|      Y_G|
|         H|       X_H|      Y_H|
|         I|       X_I|      Y_I|
 ---------- ---------- --------- 

What I want to obtain is the shortest euclidean distance from Place (from DF1) to City(from DF2)

So what I have to do is: first calculate the distance of Place A to the cities D until I and then decide the shortest distance based on the calculation.

So the pseudocode is something showed below containing a nested for loop:

for (places = ranging from A until C){
   X1 = places.lat
   Y1 = places.lon
   for (city = ranging from D until I){
       X2 = city.lat
       Y2 = city.lon
       list d = sqrt((X2-X1)^2 - (Y2-Y1)^2))
   res[place] = min(d)}

where res[] is actually a column in a dataframe containing the shortest distance.

So what I first thought is using a CrossJoin() between the two dataframes, but then I don't know how I should continue after that step.

So can help anyone help me out?

CodePudding user response:

Once you've done your cross join, you can compute euclidean distance using hypot function and store it into a distance column with withColumn dataset method and then get minimum value of this column by grouping by Place column aggregating distance column with min aggregate function.

Here is the complete code:

import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.hypot

df1.crossJoin(df2)
  .withColumn("distance", hypot(df1.col("lat") - df2.col("lat"), df1.col("lon") - df2.col("lon")))
  .groupBy("Place")
  .agg(functions.min("distance").as("min_distance"))

You will get a dataframe with two columns, similar to the following one:

 ----- ----------------- 
|Place|min_distance     |
 ----- ----------------- 
|B    |2.68700576850888 |
|C    |2.545584412271571|
|A    |2.82842712474619 |
 ----- ----------------- 
  • Related