Home > Blockchain >  kafka consumer with confluent-kafka-go change offset
kafka consumer with confluent-kafka-go change offset

Time:07-28

I create a new consumer with this configuration:

c, err := kafka.NewConsumer(&kafka.ConfigMap{
                 "bootstrap.servers": addresses,
                 "group.id":          "my_group",
                 "auto.offset.reset": "earliest",
         })
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
    panic(err)
}

Then I produce events based on the following code and consume one event:

events := []map[string]string{
{                                     
        "name":       "Foo",
},                                                 
{                                                  
        "name":       "Bar",                                                       
},                                                 
}                                                          
                                                                                                    
err = p.ProduceEvent(events[0])//there is a wrapper to produce events       
err = p.ProduceEvent(events[1])                                    
                                                                                   
res, err := c.ReadMessage(100 * time.Second)                                              
time.Sleep(20 * time.Second)                                                       
                                                                                                              
c.Close()                                  

when I describe the group with watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe. The results in each step is:

  1. after producing the events: enter image description here

  2. when I consume one event: enter image description here

  3. after closing the consumer: enter image description here

I can't understand why finally the lag is zero! I just consumed one events. It's weird behaviour to me, that Close would be change the offset. Any clue is appreciated.

CodePudding user response:

ReadMessage wraps Poll. Poll gets a batch of messages and buffers them locally. Since you have configured your consumer to auto commit offsets, it will commit all the fetched messages, even the ones that are locally cached and that your application has still not processed. That is why you see that there is no lag after closing the consumer.

librdkafka (and thus confluent-kafka-go) does not have a way to configure max.pool.records so if you want to control exactly which offsets get commmited, you'll need to disable auto commit offsets and commit them manually using StoreOffsets: https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016

  • Related