Home > other >  Scala partial function does not work with spark mapPartitionsWithIndex
Scala partial function does not work with spark mapPartitionsWithIndex

Time:10-04

While calling an api called mapPartitionsWithIndex in spark I have used partial function with case which works pretty fine

val numbersRDD = sc.parallelize(numbers)
      numbersRDD.mapPartitionsWithIndex{ case x: (Int,Iterator[Int]) => Iterator((x._1,x._2.size))}

When I explictly declared it as partialfunction like below

val dosomething: PartialFunction[(Int,Iterator[Int]), Iterator[(Int,Any)]] = {
    case x: (Int,Iterator[Int]) => Iterator((x._1,x._2.size))
  }
  numbersRDD.mapPartitionsWithIndex{dosomething}

Complier starts complaning .I am not sure what I am doing wrong .I might be missing something here.

CodePudding user response:

Because mapPartitionsWithIndex does not accept a Partial Function but a Function. Even though you are providing a partial function (implementation) in your first code, it's not specifically typed as PartialFunction and the compiler will infer it as Function2.

But in the second piece of code, since you defined it as PartialFunction, compiler cannot convert it into Function2 hence the compilation error.

Why do you want to pass a PartialFunction? The method signature of mapPartitionsWithIndex clearly expects a Function and not a PartialFunction.

  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

PartialFunction is ALWAYS a Function1 i.e. it's of type A => B. Filter accepts a Function1 and hence when you pass partial function to filter, it does not give compilation error, since your partial function will also be sth like A => Boolean. But, the method mapPartitionsWithIndex expects Function2 hence the compilation error. This can be tried out in scala REPL like this:

// Notice the type function1 but mapPartitionsWithIndex expects Function2.
scala>   val dosomething: PartialFunction[(Int, Iterator[Int]), Iterator[(Int, Any)]] = {
     |     case x: (Int, Iterator[Int]) => Iterator((x._1, x._2.size))
     |   }
val dosomething: PartialFunction[(Int, Iterator[Int]),Iterator[(Int, Any)]] = <function1>


// Since this is still function1, it's accepted inside filter. Also, this will still give you MatchError but not compilation.
scala>   val dosomething1: PartialFunction[Int, Boolean] = {
     |     case x: Int if x > 10 => false
     |   }
val dosomething1: PartialFunction[Int,Boolean] = <function1>


  • Related