Home > Back-end >  Scala Kafka Spark: Get all kafkaConsumer endOffSets and assign it to a val
Scala Kafka Spark: Get all kafkaConsumer endOffSets and assign it to a val

Time:05-19

The code below is inefficient, it requests the kafkaConsumer each time in the for-loop (where it says <!-- move code below -->). How do I move it to <!-- move it here --> so I only have to request it once per topic? I believe I have to get all the TopicPartition from the jsonOffsets and place it in kafkaConsumer.endOffsets but I'm not sure how to do that.

endOffsets takes type TopicPartitions enter image description here

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    event.progress.sources
      // Ignoring sqs / jms sources their offsets
      .filter(source => source.description.toLowerCase().contains("kafka"))
      // ex offset :
      //    "endOffset" : {
      //      "rcs-enriched-event" : {
      //        "8" : 31376,
      //        "11" : 39114,
      //      } ...
      .foreach(source => {

        /// Map[Topic,Map[Partition, CurrentOffset]]
        val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
        jsonOffsets.keys.filter(key => topics.contains(key))
          .foreach(topic => {
            val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()

            // Map[Partition, CurrentOffset]
            val topicOffsetList = new ListBuffer[Int]()

            val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)

            
 <!-- move it here -->

            offsets match {
              case Some(topicOffsetData) =>
                topicOffsetData.keys.foreach(partition => {
                  /// "4" : 34937
                  val tp = new TopicPartition(topic, partition.toInt)
                  val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)

                  val bbCurrentOffset = topicOffsetData(partition).toLong

                  <!-- move code below -->
                  val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp)) 

                  // latest offset
                  val partitionLatestOffset = kafkaPartitionOffset.get(tp)

                  // Log for a particular partition
                  val delta = partitionLatestOffset - bbCurrentOffset

                  topicOffsetList  = delta.abs

                  topicPartitionMap.put(tp, oam)
                })
            }
            try {
              kafkaConsumer.commitSync(topicPartitionMap)
            } catch {
              case e: Exception => log.error(s"${groupId} Could not commit offset", e)
            }

            //log.info have the group id (unique id), the topic, cumulative consumer lag delta
            //push out to logger
            log.info("groupId: "   groupId   " topic: "   topic   " lagDeltaSum: "   topicOffsetList.sum)
          })

      })
  }

CodePudding user response:

Seems to me you should loop over offsets twice.

.foreach(topic => {
    val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
    // Fetch latest offsets
    val consumedPartitions = new ListBuffer[TopicPartition]() 
    offsets match {
      case Some(topicOffsetData) =>
        topicOffsetData.keys.foreach(partition => {
          val tp = new TopicPartition(topic, partition.toInt)
          consumedPartitions  = tp
        }
    }
    val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava) 
    
    offsets match {
      case Some(topicOffsetData) =>
         // Use latest offsets, as needed ... 
  • Related