I create a new consumer with this configuration:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": addresses,
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
panic(err)
}
Then I produce events based on the following code and consume one event:
events := []map[string]string{
{
"name": "Foo",
},
{
"name": "Bar",
},
}
err = p.ProduceEvent(events[0])//there is a wrapper to produce events
err = p.ProduceEvent(events[1])
res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)
c.Close()
when I describe the group with
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe
. The results in each step is:
I can't understand why finally the lag is zero! I just consumed one events. It's weird behaviour to me, that Close
would be change the offset. Any clue is appreciated.
CodePudding user response:
ReadMessage
wraps Poll
. Poll
gets a batch of messages and buffers them locally. Since you have configured your consumer to auto commit offsets, it will commit all the fetched messages, even the ones that are locally cached and that your application has still not processed. That is why you see that there is no lag after closing the consumer.
librdkafka
(and thus confluent-kafka-go
) does not have a way to configure max.pool.records
so if you want to control exactly which offsets get commmited, you'll need to disable auto commit offsets and commit them manually using StoreOffsets
: https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016