Home > Mobile >  Why is Kafka server limited to 100K messages?
Why is Kafka server limited to 100K messages?

Time:03-29

Got an out of the box Kafka server and have the following script

#!/usr/bin/perl

use Net::Kafka::Producer;
use AnyEvent;

my $condvar     = AnyEvent->condvar;
my $producer    = Net::Kafka::Producer->new(
    'bootstrap.servers' => 'localhost:9092'
);

for (my $index = 1;;$index  ) {
    my $msg = "message: " . $index;
    $producer->produce(
        payload => $msg,
        topic   => "tracked-coords"
    )->then(sub {
        my $delivery_report = shift;
        $condvar->send;
        print "Message successfully delivered with offset " . $delivery_report->{offset};
    }, sub {
        my $error = shift;
        $condvar->send;
        die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
    });

}

Why does the Kafka server stop at 100K messages?

EDIT

The server stops reporting that it is receiving messages. Also the consumer stops receiving messages

EDIT

Kafka server logs this (at the end)

message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)

Here is the code for the consumer

#!/usr/bin/perl

use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;

my $consumer    = Net::Kafka::Consumer->new(
    'bootstrap.servers'     => 'localhost:9092',
    'group.id'              => 'mock_data',
    'enable.auto.commit'    => 'true',
);

$consumer->subscribe( [ "tracked-coords"] );

while (1) {
  my $msg = $consumer->poll(1000);
  if ($msg) {
    $consumer->commit(); #_message(0, $msg);
    say "====================================================================";
    if ( $msg->err ) {
      say "Error: ", Net::Kafka::Error::to_string($err);
    } else {
      say $msg->payload;
    }
  }
}

And the consumer stops at 100K

CodePudding user response:

Since you're using Net::Kafka, which uses librdkafka library, maybe it's the queue.buffering.max.messages setting. That defaults to 100,000. Meaning:

Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html

Try setting that to low number in probably your Net::Kafka::Producer->new() call to see if it cuts out sooner. That setting support 1-10M range. Oddly I don't see it in the Kafka server settings, so guess it's an edenhill driver setting only.

  • Related