Home > Mobile >  Iterator inside state function coming as Empty
Iterator inside state function coming as Empty

Time:03-09

I have case classes like these. And I am generating test data using RateStreamSource. It is giving me a Dataset. Now I am grouping the dataset groupByKey and call mapGroupsWithState.

However inside the state function updateRateAnother there is some logic and I am printing the iterator. The Iterator always comes as Empty in the method and my logic does not work.

Following is the minimum reproducible example of the code

case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)

object ResourceConfigConsolidator {

   def main(args: Array[String]): Unit = {

       val sparkSession = SparkSession
                         .builder()
                         .appName("TestJob")
                         .getOrCreate()

      import sparkSession.implicits._
      val rate = 2
      val randoms = List(10, 20, 30, 40, 50, 60, 70)
      def randomElement = Random.shuffle(randoms).head
      val rcConfigDS = sparkSession
                        .readStream
                        .format("rate") // <-- use RateStreamSource
                        .option("rowsPerSecond", rate)
                        .load()
                        .as[Rate].filter(_.value % 40 == 0).map {
                          r => Rate2(r.timestamp, r.value, randomElement)
                        }


 def updateRateAnother(key: Int, values: Iterator[Rate2], state: 
                              GroupState[Employee]): Option[Employee] = {
     println("key is here ::"   key)
     if (state.hasTimedOut) {
    // We've timed out, lets extract the state and send it down the stream
    state.remove()
    state.getOption
  } else {
    println("the iterating values ::::"   values.toList.mkString(" , \n"))
    println("hello length ::::"   values.length)
    if (!state.exists) {
      if (values.length == 0) {
        None
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val employee = Employee(latestValue.value.toString, latestValue.value)
        state.update(employee)
        Some(employee)
      }
    } else {
      if (values.isEmpty) {
        val currentState = state.get
        Some(currentState)
      } else {
        val latestValue = values.toList.maxBy(_.value)
        val currentState = state.get
        val updated = currentState.copy(latestValue.value.toString, latestValue.value)
        state.update(currentState.copy(latestValue.value.toString, latestValue.value))
        Some(updated)
       }
     }
    }
   }

  val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
  mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp => 
  emp)

  res.writeStream.format("console")
  .outputMode(OutputMode.Update())
  .option("truncate", value = false)
  .option("checkpointLocation", "checkpoint1")
  .start()

 }
}

Since I am grouping with the age, there should be at least one object in the iterator . Am I right in saying this ? Why is the iterator coming as empty ?

CodePudding user response:

Are you sure it is empty when you print it? Because that's the only thing that is surprising. You can only go through an iterator once, so once you do values.toList for the first time, it becomes empty. You should assign result of toList to a variable, and discard the iterator.

Better yet, change your logic so that you only need one pass, and get rid of toList (you can call maxBy on Iterator directly ... but only once). The idea is to not load all data at once into memory when dealing with a large dataset.

  • Related