Home > Back-end >  Kafka Streams Scala Memory Leak
Kafka Streams Scala Memory Leak

Time:07-06

Following use case:
I want to aggregate data for a specific time and then downstream them. Since the built-in suppress-feature does not support wall clock time, I have to implement this on my own by using a transformer.
After the time window is closed I downstream the aggregated data and delete them from the state store. I tested the behaviour with a limited amount of data. I.e. after all data have been processed the state store should be empty again and the memory should decrease. Unfortunately the memory always stays at the same level.

SuppressTransformer.scala

class SuppressTransformer[T](stateStoreName: String, windowDuration: Duration) extends Transformer[String, T, KeyValue[String, T]] {
  val scheduleInterval: Duration = Duration.ofSeconds(180)
  private val keySet = mutable.HashSet.empty[String]
  var context: ProcessorContext = _
  var store: SessionStore[String, Array[T]] = _


  override def init(context: ProcessorContext): Unit = {
    this.context = context;
    this.store = context.getStateStore(stateStoreName).asInstanceOf[SessionStore[String, Array[T]]]

    this.context.schedule(
      scheduleInterval,
      PunctuationType.WALL_CLOCK_TIME,
      _ => {
        for (key <- keySet) {
          val storeEntry = store.fetch(key)
          while (storeEntry.hasNext) {
            val keyValue: KeyValue[Windowed[String], Array[T]] = storeEntry.next()
            val peekKey = keyValue.key
            val now = Instant.now()
            val windowAge: Long = ChronoUnit.SECONDS.between(peekKey.window().startTime(), now)

            if (peekKey.window().start() > 0 && windowAge > windowDuration.toSeconds) {  // Check if window is exceeded. If yes, downstream data
              val windowedKey: Windowed[String] = keyValue.key
              val storeValue = keyValue.value
              context.forward(key, storeValue, To.all().withTimestamp(now.toEpochMilli))
              context.commit()
              this.store.remove(windowedKey) // Delete entry from state store
              keySet -= key
            }
          }
          storeEntry.close()  // Close iterator to avoid memory leak
        }
      }
    )
  }

  override def transform(key: String, value: T): KeyValue[String, T] = {
    if (!keySet.contains(key)) {
      keySet  = key
    }
    null
  }

  override def close(): Unit = {}
}

class SuppressTransformerSupplier[T](stateStoreName: String, windowDuration: Duration) extends TransformerSupplier[String, T, KeyValue[String, T]] {
  override def get(): SuppressTransformer[T] = new SuppressTransformer(stateStoreName, windowDuration)
}

Topology.scala

val windowDuration = Duration.ofMinutes(5)
val stateStore: Materialized[String, util.ArrayList[Bytes], ByteArraySessionStore] =
  Materialized
    .as[String, util.ArrayList[Bytes]](
      new RocksDbSessionBytesStoreSupplier(stateStoreName,
        stateStoreRetention.toMillis)
    )

builder.stream[String, Bytes](Pattern.compile(topic   "(-\\d )?"))
  .filter((k, _) => k != null)
  .groupByKey
  .windowedBy(SessionWindows `with` sessionWindowMinDuration `grace` sessionGracePeriodDuration)
  .aggregate(initializer = {
    new util.ArrayList[Bytes]()
  }
  )(aggregator = (_: String, instance: Bytes, agg: util.ArrayList[Bytes]) => {
    agg.add(instance)
    agg
  }, merger = (_: String, state1: util.ArrayList[Bytes], state2: util.ArrayList[Bytes]) => {
    state1.addAll(state2)
    state1
  }
  )(stateStore)
  .toStream
  .map((k, v) => (k.key(), v))
  .transform(new SuppressTransformerSupplier[util.ArrayList[Bytes]](stateStoreName, windowDuration), stateStoreName)
  .unsetRepartitioningRequired()
  .to(f"$topic-aggregated")

enter image description here

CodePudding user response:

I don't think that is a memory leak. I mean it could be. But from what you mentioned, it looks like normal Java behavior.

What happens is that JVM takes all the memory that it can. It is the heap memory and the maximum is configured by the Xmx option. Your state takes it all (I assume, based on the graph) and then releases the objects. But JVM normally doesn't release the memory back to the OS. That is the reason your pod is always at its highest.

There are a few garbage colletors that could possibly do that for you.

I personally use the GC that is faster and let JVM take as much memory as it requires. At the end of the day, that's the power of pod isolation. I normally set the heap max to

  • Related