I'm looping thrice on offsets
:
- Get
TopicPartition
&offset
- Get
TopicPartition
&OffsetAndMetadata
- Get the delta between the producer & consumer
I'm wondering if I can get the offset
value from OffsetAndMetadata
but I'm not sure how. I couldn't find examples online to get this value. Any help is appreciated, thank you!
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
event.progress.sources
// Ignoring sqs / jms sources their offsets
.filter(source => source.description.toLowerCase().contains("kafka"))
// ex offset :
// "endOffset" : {
// "rcs-enriched-event" : {
// "8" : 31376,
// "11" : 39114,
// "2" : 39376,
// } ...
.foreach(source => {
/// Map[Topic,Map[Partition, CurrentOffset]]
val jsonOffsets = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Int]]])
jsonOffsets.keys.filter(key => topics.contains(key))
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
val consumedPartitions = new ListBuffer[TopicPartition]()
val topicOffsetList = new ListBuffer[Int]()
val mapTopicPartitionOffset = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val offset = offsets(partition).toLong
(tp -> offset)
})
.toMap
val mapTopicPartition = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
val oam = new OffsetAndMetadata(offsets(partition).toLong)
(tp -> oam)
})
.toMap
for(topicPartition <- mapTopicPartitionOffset){
consumedPartitions = topicPartition
}
try {
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,offset) <- mapTopicPartitionOffset){
val bbCurrentOffset = offset
// latest offset
val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)
// Partition offset delta
val delta = partitionLatestOffset - bbCurrentOffset
topicOffsetList = delta.abs
}
} catch {
case e: Exception => {
log.error(s"${consumerGroupId} Could not get Kafka offset", e)
}
}
try {
kafkaConsumer.commitSync(mapTopicPartition.asJava)
} catch {
case e: Exception => log.error(s"${consumerGroupId} Could not commit offset", e)
}
//log.info have the group id (unique id), the topic, cumulative consumer lag delta
log.info("consumerGroupId: " consumerGroupId " topic: " topic " lagDeltaSum: " topicOffsetList.sum)
})
})
}
CodePudding user response:
You cannot get an offset value without both a topic and a partition. You're already iterating for each topic, and your JSON shows that you have keys in an map of partitions, so this question is simply "how do you get values from a map?"
More importantly, do you actually need to get the offset from OffsetAndMetadata
when you have the same value two other places?
More specifically, you don't need both mapTopicPartitionOffset
and mapTopicPartition
. Each contain the exact same information, one just has an object as a value with null metadata property.
You also have offsets
map, whose values are all the offsets for the current topic you're iterating, and the keys are its (consumed) partitions.
Each contain the offset values that you want
You only need mapTopicPartition
to use with commitSync
, so try this
.foreach(topic => {
val offsets: Map[String, Int] = jsonOffsets(topic)
// for getting end offsets
val consumedPartitions = new ListBuffer[TopicPartition]()
// convert to values accepted by Kafka Consumer API
val toCommit = offsets
.keys
.map(partition => {
val tp = new TopicPartition(topic, partition.toInt)
consumedPartitions = tp
val oam = new OffsetAndMetadata(offsets.get(partition).toLong)
(tp -> oam)
})
.toMap
// could also use toCommit.keys instead of separate list
val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)
for((topicPartition,oam) <- toCommit) {
val bbCurrentOffset = oam.offset()
// or offsets.get(topicPartition.partition)