The code below is inefficient, it requests the kafkaConsumer
each time in the for-loop (where it says <!-- move code below -->
). How do I move it to <!-- move it here -->
so I only have to request it once per topic? I believe I have to get all the TopicPartition
from the jsonOffsets
and place it in kafkaConsumer.endOffsets
but I'm not sure how to do that.
endOffsets
takes type TopicPartitions
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val topicPartitionMap = new java.util.HashMap[TopicPartition, OffsetAndMetadata]()
// Map[Partition, CurrentOffset]
val topicOffsetList = new ListBuffer[Int]()
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
<!-- move it here -->
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
/// "4" : 34937
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(topicOffsetData(partition).toLong)
val bbCurrentOffset = topicOffsetData(partition).toLong
<!-- move code below -->
val kafkaPartitionOffset = kafkaConsumer.endOffsets(java.util.Arrays.asList(tp))
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(tp)
// Log for a particular partition
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList = delta.abs
topicPartitionMap.put(tp, oam)
})
}
try {
kafkaConsumer.commitSync(topicPartitionMap)
} catch {
case e: Exception => log.error(s"${groupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
//push out to logger
log.info("groupId: " groupId " topic: " topic " lagDeltaSum: " topicOffsetList.sum)
})
})
}
CodePudding user response:
Seems to me you should loop over offsets
twice.
.foreach(topic => {
val offsets: Option[Map[String, Int]] = jsonOffsets.get(topic)
// Fetch latest offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
offsets match {
case Some(topicOffsetData) =>
topicOffsetData.keys.foreach(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions = tp
}
}
val latestOffsets = kafkaConsumer.endOffsets(consumedPartitions.asJava)
offsets match {
case Some(topicOffsetData) =>
// Use latest offsets, as needed ...