Home > Blockchain >  how to implement seekToEnd() in Spring Kafka belonging to AbstractConsumerSeekAware class?
how to implement seekToEnd() in Spring Kafka belonging to AbstractConsumerSeekAware class?

Time:10-22

I am having a requirement. We have an spring boot kafka consumer app which is reading from a kafka topic. Our requirement is whenever the app goes down and comes up I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group? I have researched a little bit and used AbstractConsumerSeekAware to set the offset to the end using seekToEnd() as seen in the below code.

public class KafkaConsumer extends AbstractConsumerSeekAware {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;



    @KafkaListener(topics = "${topic.consumer}")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        //consuming the message from consumer Record.
        seekToEnd();
        doSomething();
        ack.acknowledge();
    }

However when we stopped the app and restarted it, it started reading from last offset where it left but we wanted to read only from the offset when the app started. How can we achieve this?

CodePudding user response:

That's not correct to do that in the @KafkaListener method. This one is really called only when consumer delivers records from the partition(s).

You must implement an onPartitionsAssigned() for that reason, so the consumer is going seek those partitions before starting polling them.

See more in docs: https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

  • Related