Following use case:
I want to aggregate data for a specific time and then downstream them. Since the built-in suppress-feature does not support wall clock time, I have to implement this on my own by using a transformer.
After the time window is closed I downstream the aggregated data and delete them from the state store. I tested the behaviour with a limited amount of data. I.e. after all data have been processed the state store should be empty again and the memory should decrease. Unfortunately the memory always stays at the same level.
SuppressTransformer.scala
class SuppressTransformer[T](stateStoreName: String, windowDuration: Duration) extends Transformer[String, T, KeyValue[String, T]] {
val scheduleInterval: Duration = Duration.ofSeconds(180)
private val keySet = mutable.HashSet.empty[String]
var context: ProcessorContext = _
var store: SessionStore[String, Array[T]] = _
override def init(context: ProcessorContext): Unit = {
this.context = context;
this.store = context.getStateStore(stateStoreName).asInstanceOf[SessionStore[String, Array[T]]]
this.context.schedule(
scheduleInterval,
PunctuationType.WALL_CLOCK_TIME,
_ => {
for (key <- keySet) {
val storeEntry = store.fetch(key)
while (storeEntry.hasNext) {
val keyValue: KeyValue[Windowed[String], Array[T]] = storeEntry.next()
val peekKey = keyValue.key
val now = Instant.now()
val windowAge: Long = ChronoUnit.SECONDS.between(peekKey.window().startTime(), now)
if (peekKey.window().start() > 0 && windowAge > windowDuration.toSeconds) { // Check if window is exceeded. If yes, downstream data
val windowedKey: Windowed[String] = keyValue.key
val storeValue = keyValue.value
context.forward(key, storeValue, To.all().withTimestamp(now.toEpochMilli))
context.commit()
this.store.remove(windowedKey) // Delete entry from state store
keySet -= key
}
}
storeEntry.close() // Close iterator to avoid memory leak
}
}
)
}
override def transform(key: String, value: T): KeyValue[String, T] = {
if (!keySet.contains(key)) {
keySet = key
}
null
}
override def close(): Unit = {}
}
class SuppressTransformerSupplier[T](stateStoreName: String, windowDuration: Duration) extends TransformerSupplier[String, T, KeyValue[String, T]] {
override def get(): SuppressTransformer[T] = new SuppressTransformer(stateStoreName, windowDuration)
}
Topology.scala
val windowDuration = Duration.ofMinutes(5)
val stateStore: Materialized[String, util.ArrayList[Bytes], ByteArraySessionStore] =
Materialized
.as[String, util.ArrayList[Bytes]](
new RocksDbSessionBytesStoreSupplier(stateStoreName,
stateStoreRetention.toMillis)
)
builder.stream[String, Bytes](Pattern.compile(topic "(-\\d )?"))
.filter((k, _) => k != null)
.groupByKey
.windowedBy(SessionWindows `with` sessionWindowMinDuration `grace` sessionGracePeriodDuration)
.aggregate(initializer = {
new util.ArrayList[Bytes]()
}
)(aggregator = (_: String, instance: Bytes, agg: util.ArrayList[Bytes]) => {
agg.add(instance)
agg
}, merger = (_: String, state1: util.ArrayList[Bytes], state2: util.ArrayList[Bytes]) => {
state1.addAll(state2)
state1
}
)(stateStore)
.toStream
.map((k, v) => (k.key(), v))
.transform(new SuppressTransformerSupplier[util.ArrayList[Bytes]](stateStoreName, windowDuration), stateStoreName)
.unsetRepartitioningRequired()
.to(f"$topic-aggregated")
CodePudding user response:
I don't think that is a memory leak. I mean it could be. But from what you mentioned, it looks like normal Java behavior.
What happens is that JVM takes all the memory that it can. It is the heap memory and the maximum is configured by the Xmx
option. Your state takes it all (I assume, based on the graph) and then releases the objects. But JVM normally doesn't release the memory back to the OS. That is the reason your pod is always at its highest.
There are a few garbage colletors that could possibly do that for you.
I personally use the GC that is faster and let JVM take as much memory as it requires. At the end of the day, that's the power of pod isolation. I normally set the heap max to