Home > front end >  Understanding spark sql custom join expression which contains map and reduce functions
Understanding spark sql custom join expression which contains map and reduce functions

Time:10-09

I am new to Big data and going through an existing code base and trying to understand a specific piece of code. I was struck while understanding a join expression used for joining two dataframes where reduce is being used as part of the expression. Here is the code which contains the join expression

 def joinOnMultipleColumns(leftDF: Dataset[Row], rightDF: Dataset[Row],
      leftColumns: List[String], rightColumns: List[String]
  ): DataFrame = {
   // Both leftColumns and rightColumns variables are of same length 
    val joinExpression = leftColumns
      .zip(rightColumns)
      .map { case (c1, c2) => col(c1) === col(c2) } 
      .reduce(_ && _)  // -----> what does the map and reduce part mean here

    rightDF.cache.show
    leftDF.join(rightDF, joinExpression)
  }

Please let me know if I need to provide any further information

As per my assumption, the function receives two column lists [t1_col1, t1_col2], [t2_col1, t2_col2] along with the two dataframes

  • zip would result in (t1_col1, t2_col1), (t1_col2, t2_col2)
  • map and reduce combined will create a join expression with col1===col2 but not sure what's happening exactly and my assumption might be completely wrong too

Can someone please help me in understanding what does the code actually do?

CodePudding user response:

Map is a high order function which responsible for transforming valuses in some collection by applying function passed to map to each value in the collection. Let's dig into your code:

  1. Assume you have following values: leftColumns = ["col1_1", "col1_2"] and rightColumn = ["col2_1", "col2_2"]

  2. zip

leftColumns
.zip(rightColumns)

On this step we zip two initial collections of strings into one collection of tuples: [("col1_1", "col2_1"),("col1_2", "col2_2")]

  1. map
.map { case (c1, c2) => col(c1) === col(c2) } // 

As I said earlier, with map we need to apply some function to each element in the collection. Element of the collection is a tuple of (String,String). Function is a col(left)===col(right). So it means we're transforming List[(String,String)] to List[Column] (because === applied to column will return column === in scaladoc)

Finaly we will get: [col("col1_1") === col("col2_1"), col("col1_2") === col("col2_2")]

  1. reduce
.reduce(_ && _)

Reduce is responsible for folding collection of values into one value. In this case we're folding List[Column] to just Column by applying &&(And operator, which will return Column if we apply it to Column && in scaladoc

So finally we will get this: col("col1_1") === col("col2_1") && col("col1_2") === col("col2_2") which is set of conditions for joining 2 dataframes

  • Related