Home > Back-end >  Kafka Scala: How to get offset value from OffsetAndMetadata class
Kafka Scala: How to get offset value from OffsetAndMetadata class

Time:05-23

I'm looping thrice on offsets:

  1. Get TopicPartition & offset
  2. Get TopicPartition & OffsetAndMetadata
  3. Get the delta between the producer & consumer

I'm wondering if I can get the offset value from OffsetAndMetadata but I'm not sure how. I couldn't find examples online to get this value. Any help is appreciated, thank you!

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    event.progress.sources
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //        "2" : 39376,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val offsets: Map[String, Int] = jsonOffsets(topic)
            val consumedPartitions = new ListBuffer[TopicPartition]()
            val topicOffsetList = new ListBuffer[Int]()

            val mapTopicPartitionOffset = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val offset = offsets(partition).toLong
                (tp -> offset)
              })
              .toMap

            val mapTopicPartition = offsets
              .keys
              .map(partition => {
                val tp = new TopicPartition(topic, partition.toInt)
                val oam = new OffsetAndMetadata(offsets(partition).toLong)
                (tp -> oam)
              })
              .toMap


            for(topicPartition <- mapTopicPartitionOffset){
              consumedPartitions  = topicPartition
            }

            try {
              val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)

              for((topicPartition,offset) <- mapTopicPartitionOffset){

                val bbCurrentOffset =  offset

                // latest offset
                val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)

                // Partition offset delta
                val delta = partitionLatestOffset - bbCurrentOffset

                topicOffsetList  = delta.abs
              }
            } catch {
              case e: Exception => {
                log.error(s"${consumerGroupId} Could not get Kafka offset", e)
              }
            }

            try {
              kafkaConsumer.commitSync(mapTopicPartition.asJava)
            } catch {
              case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
            }

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            log.info("consumerGroupId: "   consumerGroupId   " topic: "   topic   " lagDeltaSum: "   topicOffsetList.sum)
          })

      })
  }

CodePudding user response:

You cannot get an offset value without both a topic and a partition. You're already iterating for each topic, and your JSON shows that you have keys in an map of partitions, so this question is simply "how do you get values from a map?"

More importantly, do you actually need to get the offset from OffsetAndMetadata when you have the same value two other places?

More specifically, you don't need both mapTopicPartitionOffset and mapTopicPartition. Each contain the exact same information, one just has an object as a value with null metadata property.

You also have offsets map, whose values are all the offsets for the current topic you're iterating, and the keys are its (consumed) partitions.

Each contain the offset values that you want

You only need mapTopicPartition to use with commitSync, so try this

.foreach(topic => {
    val offsets: Map[String, Int] = jsonOffsets(topic)
    // for getting end offsets
    val consumedPartitions = new ListBuffer[TopicPartition]()
    // convert to values accepted by Kafka Consumer API
    val toCommit = offsets
          .keys
          .map(partition => {
            val tp = new TopicPartition(topic, partition.toInt)
            consumedPartitions  = tp
            val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
            (tp -> oam)
         })
        .toMap

    // could also use toCommit.keys instead of separate list 
    val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
 
    for((topicPartition,oam) <- toCommit) {

        val bbCurrentOffset = oam.offset() 
        // or offsets.get(topicPartition.partition)
  • Related