Home > Enterprise >  Stop Kafka Consumer after consuming the log end offset using reactor kafka
Stop Kafka Consumer after consuming the log end offset using reactor kafka

Time:11-09

I have a requirement where I will consume all message from a topic from the beginning to the latest message offset in the partition then stop the consumer from listening to the topic. I have created the following code snippet:

ReceiverOptions<String, byte[]> receiverOptions = this.receiverOptions.subscription(Collections.singleton( topicName ) )
          .addAssignListener( partitions -> {
              log.debug( "starting to {} offset", startOffset );
              partitions.forEach( element -> element.seekToBeginning() 
          )
          .addRevokeListener( partitions -> log.debug( "partition revoked = {}", partitions ) );

Flux<ReceiverRecord<String, byte[]>> fluxKafka = KafkaReceiver.create( receiverOptions ).receive();

disposable = fluxKafka.publishOn( Schedulers.fromExecutor( scheduledExecutor ) )
          .subscribe( record -> {

              log.debug( "received record [offset = {}, message = {}, timestamp = {}]", record.offset(),
                            new String( record.value() ), record.timestamp() );

              T message = avroDeserializer.deserialize( record.value() );

              Optional.ofNullable( onMessageHandler )
                            .ifPresent( handler -> handler.accept( message ) );

              record.receiverOffset().acknowledge();
          } );

I don't know what approach should I do.. I'm thinking of getting the current partition position() but I'm not sure if I should proceed with it..

CodePudding user response:

So I finally figured it out, I'm not familiar with reactor so it took me a while, but the code below works as expected:

public Disposable consumeMessages(TopicPartition topicPartition) {
        ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topicPartition.topic()))
                .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));
        KafkaReceiver<Integer, String> kafkaReceiver = KafkaReceiver.create(options);
        return kafkaReceiver
                .receive()
                .flatMap(record -> kafkaReceiver.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition))).cache()
                        .map(topicPartitionToLastOffset -> Tuples.of(record, topicPartitionToLastOffset.get(topicPartition))))
                .takeUntil(recordAndLastOffset -> recordAndLastOffset.getT1().offset() >= (recordAndLastOffset.getT2()-1))
                .subscribe(record -> {
                    ReceiverOffset offset = record.getT1().receiverOffset();
                    System.out.printf("Received message: topic-partition=%s offset=%d key=%d value=%s\n",
                            offset.topicPartition(),
                            offset.offset(),
                            record.getT1().key(),
                            record.getT1().value());
                    offset.acknowledge();
                });
    }  
  • Related