Home > Enterprise >  Check the minimum by iterating one row in a dataframe over all the rows in another dataframe
Check the minimum by iterating one row in a dataframe over all the rows in another dataframe

Time:10-31

Let's say I have the following two dataframes:

DF1:
 ---------- ---------- ---------- 
|     Place|Population|    IndexA|     
 ---------- ---------- ---------- 
|         A|       Int|       X_A|
|         B|       Int|       X_B|
|         C|       Int|       X_C|
 ---------- ---------- ---------- 

DF2:
 ---------- ---------- 
|      City|    IndexB|     
 ---------- ---------- 
|         D|       X_D|      
|         E|       X_E|   
|         F|       X_F|  
|      ....|      ....|
|        ZZ|      X_ZZ|
 ---------- ---------- 

The dataframes above are normally of much larger size.

I want to determine to which City(DF2) the shortest distance is from every Place from DF1. The distance can be calculated based on the index. So for every row in DF1, I have to iterate over every row in DF2 and look for the shortest distances based on the calculations with the indexes. For the distance calculation there is a function defined:

val distance = udf(
      (indexA: Long, indexB: Long) => {
        h3.instance.h3Distance(indexA, indexB)
      })

I tried the following:

val output =  DF1.agg(functions.min(distance(col("IndexA"), DF2.col("IndexB"))))

But this, the code compiles but I get the following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s)
H3Index#220L missing from Places#316,Population#330,IndexAx#338L in operator !Aggregate
[min(if ((isnull(IndexA#338L) OR isnull(IndexB#220L))) null else UDF(knownnotnull(IndexA#338L), knownnotnull(IndexB#220L))) AS min(UDF(IndexA, IndexB))#346].

So I suppose I do something wrong with iterating over each row in DF2 when taking one row from DF1 but I couldn't find a solution.

What am I doing wrong? And am I in the right direction?

CodePudding user response:

You are getting this error because the index column you are using only exists in DF2 and not DF1 where you are attempting to perform the aggregation.

In order to make this field accessible and determine the distance from all points, you would need to

  1. Cross join DF1 and Df2 to have every index of Df1 matching every index of DF2
  2. Determine the distance using your udf
  3. Find the min on this new cross joined udf with the distances

This may look like :

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, min, udf}

val distance = udf(
      (indexA: Long, indexB: Long) => {
        h3.instance.h3Distance(indexA, indexB)
      })

val resultDF = DF1.crossJoin(DF2)
    .withColumn("distance", distance(col("IndexA"), col("IndexB")))
    //instead of using a groupby then matching the min distance of the aggregation with the initial df. I've chosen to use a window function min to determine the min_distance of each group (determined by Place) and filter by the city with the min distance to each place
    .withColumn("min_distance", min("distance").over(Window.partitionBy("Place")))
    .where(col("distance") === col("min_distance"))
    .drop("min_distance")

This will result in a dataframe with columns from both dataframes and and additional column distance.

NB. Your current approach which is comparing every item in one df to every item in another df is an expensive operation. If you have the opportunity to filter early (eg joining on heuristic columns, i.e. other columns which may indicate a place may be closer to a city), this is recommended.

Let me know if this works for you.

CodePudding user response:

If you have only a few cities (less than or around 1000), you can avoid crossJoin and Window shuffle by collecting cities in an array and then perform distance computation for each place using this collected array:

import org.apache.spark.sql.functions.{array_min, col, struct, transform, typedLit, udf}

val citiesIndexes = df2.select("City", "IndexB")
  .collect()
  .map(row => (row.getString(0), row.getLong(1)))

val result = df1.withColumn(
  "City",
  array_min(
    transform(
      typedLit(citiesIndexes),
      x => struct(distance(col("IndexA"), x.getItem("_2")), x.getItem("_1"))
    )
  ).getItem("col2")
)

This piece of code works for Spark 3 and greater. If you are on a Spark version smaller than 3.0, you should replace array_min(...).getItem("col2") part by an user-defined function.

  • Related