Home > OS >  Kafka Streams Application with Background Thread
Kafka Streams Application with Background Thread

Time:07-12

We are developing a Kafka Stream application in Scala that reads a request from a Kafka topic, processes the request, and writes the response to a Sink Kafka topic.

  def transformRecords(inputTopic: String, outputTopic: String): Topology = {
    val builder: StreamsBuilder = new StreamsBuilder()
    builder.
      stream[String,RequestSchema](inputTopic)
      .map(transformToResponseSchema)
      .to(outputTopic)

    builder.build()
  }

To be able to use transformToResponseSchema, we have an in-memory map of key->value that needs to be updated in a scheduled way from a third party API in order to be able to process and transform the request to a response.

Is there any in-house solution we can use other than having a background thread that runs in a scheduled way and updates the map accordingly?

CodePudding user response:

must update my dictionary by going to another third party API

A Guava CacheLoader LoadingCache can do that. When you call to "get" the key, then a background function will be executed to fetch the data you want to load into the cache.

The alternative solution is to "export" your 3rd party data into a Kafka topic, which can be built into a KTable, and joined against all within Kafka Streams API.

CodePudding user response:

Here is what I wrote for one of my jobs (hope, they will not sue me for disseminating such a brilliant piece of corporate know-how :) ):

class RefreshingValue[T](frequency: FiniteDuration)(refresh: => T) extends Function0[T] {
    lazy val cache = new ThreadLocal[(T, Deadline)] {
      override def initialValue() = (refresh, Deadline.now   frequency)
    }

    def apply() = cache.get match {
      case (_, deadline) if deadline.isOverdue =>
        cache.set(refresh, Deadline.now   frequency)
        cache.get._1
      case (v, _) => v
    }
 }

The way you use it is something like this:

val mappings = new RefreshingValue(30 minutes)(refreshMappingsFromAPI)

And then, when you need to use it, you just use mappings() to get the current value or to refresh it if it is stale. This does not need any background threads or third-party libraries.

  • Related