Home > OS >  why mapPartitions does not see my val - SCALA/SPARK?
why mapPartitions does not see my val - SCALA/SPARK?

Time:12-12

I define val like this :

  val config = Config(args) 
  val product_type = config.product_type 

then I send product_type as "AA"

and my code is this :

  val scores = df.mapPartitions(iterator => {
    val inputStream =
      if(product_type == "AA" ) {
        getClass().getClassLoader().getResourceAsStream("my_aa.hdf5")
      }
      else {
        getClass().getClassLoader().getResourceAsStream("my_bb.hdf5")
      } 
    val multiLayerNetwork: MultiLayerNetwork = KerasModelImport.importKerasSequentialModelAndWeights(inputStream, false)
    val wrapped: ParallelInference = new ParallelInference.Builder(multiLayerNetwork).build()
    val res = iterator.map(row => {
      wrapped.output(row).toDoubleVector
    })
    res
  })

But my inputStream equals "my_bb.hdf5" which is not correct. This value comes from else statement. So why my product_type variable cant read in mappartition?

I print my product_type value before code and I checked it , it is : "AA"

CodePudding user response:

it occurs because of i get this variable from argument in spark submit.sh and it can not read from mappartition.

It works like this:

  val scores =
    if (product_type  == "AA") {
      df.mapPartitions(iterator => {
        val inputStream = getClass().getClassLoader().getResourceAsStream("AA.hdf5")
        val multiLayerNetwork: MultiLayerNetwork = KerasModelImport.importKerasSequentialModelAndWeights(inputStream, false)
        val wrapped: ParallelInference = new ParallelInference.Builder(multiLayerNetwork).build()
        val res = iterator.map(row => {
          wrapped.output(row).toDoubleVector
        })
        res
      })
    } else {
      df.mapPartitions(iterator => {
        val inputStream = getClass().getClassLoader().getResourceAsStream("BB.hdf5")
        val multiLayerNetwork: MultiLayerNetwork = KerasModelImport.importKerasSequentialModelAndWeights(inputStream, false)
        val wrapped: ParallelInference = new ParallelInference.Builder(multiLayerNetwork).build()
        val res = iterator.map(row => {
          wrapped.output(row).toDoubleVector
        })
        res
      })
    }
  • Related