lets say I have the following two dataframes:
DF1:
---------- ---------- ----------
| Place|Population| IndexA|
---------- ---------- ----------
| A| Int| X_A|
| B| Int| X_B|
| C| Int| X_C|
---------- ---------- ----------
DF2:
---------- ----------
| City| IndexB|
---------- ----------
| D| X_D|
| E| X_E|
| F| X_F|
| ....| ....|
| ZZ| X_ZZ|
---------- ----------
The dataframes above are normally of much larger size.
I want to determine to which City
(DF2
) the shortest distance is from every Place
from DF1
. The distance can be calculated based on the index. So for every row in DF1
, I have to itterate over every row in DF2
and look for the shortest distances based on the calculations with the indexes. For the distance calculation there is a function defined:
val distance = udf(
(indexA: Long, indexB: Long) => {
h3.instance.h3Distance(indexA, indexB)
})
I tried the following:
val output = DF1.agg(functions.min(distance(col("IndexA"), DF2.col("IndexB"))))
But this, the code compiles but I get the following error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s)
H3Index#220L missing from Places#316,Population#330,IndexAx#338L in operator !Aggregate
[min(if ((isnull(IndexA#338L) OR isnull(IndexB#220L))) null else
UDF(knownnotnull(IndexA#338L), knownnotnull(IndexB#220L))) AS min(UDF(IndexA, IndexB))#346].
So I suppose I do something wrong with itterating over each row in DF2
when taking one row from DF1
but I couldnt find a solution.
What am I doing wrong? And am I in the right direction?
CodePudding user response:
You are getting this error because the index column you are using only exists in DF2
and not DF1
where you are attempting to perform the aggregation.
In order to make this field accessible and determine the distance from all points, you would need to
- Cross join
DF1
andDf2
to have every index ofDf1
matching every index ofDF2
- Determine the distance using your udf
- Find the min on this new cross joined udf with the distances
This may look like :
val distance = udf(
(indexA: Long, indexB: Long) => {
h3.instance.h3Distance(indexA, indexB)
})
Y
val resultDF = DF1.crossJoin(DF2)
.withColumn("distance",distance(col("IndexA"),col("IndexB")))
//instead of using a groupby then matching the min distance of the aggregation with the initial df. I've chosen to use a window function min to determine the min_distance of each group (determined by Place) and filter by the city with the min distance to each place
.withColumn("min_distance",min("distance").over(Window.partitionBy("Place")))
.where(col("distance")==col("min_distance")
.drop("min_distance")
This will result in a dataframe with columns from both dataframes and and additional column distance
.
NB. Your current approach which is comparing every item in one df to every item in another df is an expensive operation. If you have the opportunity to filter early (eg joining on heuristic columns, i.e. other columns which may indicate a place may be closer to a city), this is recommended.
Let me know if this works for you.