Home > database >  How do I get the the offset of last message of a Kafka topic using confluent-kafka-python?
How do I get the the offset of last message of a Kafka topic using confluent-kafka-python?

Time:02-20

I need to retrive last N messages of a topic using confluent-kafka-python.

I've been reading https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# for a day, but no finding any appropriate method for getting the offset of the last message, thus I cannot calculate the offset for consumer to start with.

Please help. Thanks!

CodePudding user response:

You need the get_watermark_offsets() function of the Consumer. You call it with a list of TopicPartition and it returns a tuple (int, int) (low, high) for each partition.

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.get_watermark_offsets

Something like this:


from confluent_kafka import Consumer, TopicPartition

# create the Consumer with a connection to your brokers

topic_name = "my.topic"

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]

offsets = consumer.get_watermark_offsets(topicparts)

for p in enumerate(offsets):
    msg = "partition {p} starting offset {so} last offset {lo}"
    print(msg.format(p=p, so=offsets[p][0], lo=offsets[p][1]))
  • Related