I have case classes like these. And I am generating test data using RateStreamSource. It is giving me a Dataset. Now I am grouping the dataset groupByKey
and call mapGroupsWithState
.
However inside the state function updateRateAnother
there is some logic and I am printing the iterator. The Iterator always comes as Empty in the method and my logic does not work.
Following is the minimum reproducible example of the code
case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)
object ResourceConfigConsolidator {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("TestJob")
.getOrCreate()
import sparkSession.implicits._
val rate = 2
val randoms = List(10, 20, 30, 40, 50, 60, 70)
def randomElement = Random.shuffle(randoms).head
val rcConfigDS = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 40 == 0).map {
r => Rate2(r.timestamp, r.value, randomElement)
}
def updateRateAnother(key: Int, values: Iterator[Rate2], state:
GroupState[Employee]): Option[Employee] = {
println("key is here ::" key)
if (state.hasTimedOut) {
// We've timed out, lets extract the state and send it down the stream
state.remove()
state.getOption
} else {
println("the iterating values ::::" values.toList.mkString(" , \n"))
println("hello length ::::" values.length)
if (!state.exists) {
if (values.length == 0) {
None
} else {
val latestValue = values.toList.maxBy(_.value)
val employee = Employee(latestValue.value.toString, latestValue.value)
state.update(employee)
Some(employee)
}
} else {
if (values.isEmpty) {
val currentState = state.get
Some(currentState)
} else {
val latestValue = values.toList.maxBy(_.value)
val currentState = state.get
val updated = currentState.copy(latestValue.value.toString, latestValue.value)
state.update(currentState.copy(latestValue.value.toString, latestValue.value))
Some(updated)
}
}
}
}
val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp =>
emp)
res.writeStream.format("console")
.outputMode(OutputMode.Update())
.option("truncate", value = false)
.option("checkpointLocation", "checkpoint1")
.start()
}
}
Since I am grouping with the age
, there should be at least one object in the iterator . Am I right in saying this ? Why is the iterator coming as empty ?
CodePudding user response:
Are you sure it is empty when you print it? Because that's the only thing that is surprising. You can only go through an iterator once, so once you do values.toList
for the first time, it becomes empty. You should assign result of toList
to a variable, and discard the iterator.
Better yet, change your logic so that you only need one pass, and get rid of toList
(you can call maxBy
on Iterator
directly ... but only once). The idea is to not load all data at once into memory when dealing with a large dataset.