Got an out of the box Kafka server and have the following script
#!/usr/bin/perl
use Net::Kafka::Producer;
use AnyEvent;
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
for (my $index = 1;;$index ) {
my $msg = "message: " . $index;
$producer->produce(
payload => $msg,
topic => "tracked-coords"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
}
Why does the Kafka server stop at 100K messages?
EDIT
The server stops reporting that it is receiving messages. Also the consumer stops receiving messages
EDIT
Kafka server logs this (at the end)
message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)
Here is the code for the consumer
#!/usr/bin/perl
use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'mock_data',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "tracked-coords"] );
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
$consumer->commit(); #_message(0, $msg);
say "====================================================================";
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
} else {
say $msg->payload;
}
}
}
And the consumer stops at 100K
CodePudding user response:
Since you're using Net::Kafka, which uses librdkafka library, maybe it's the queue.buffering.max.messages setting. That defaults to 100,000. Meaning:
Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
Try setting that to low number in probably your Net::Kafka::Producer->new() call to see if it cuts out sooner. That setting support 1-10M range. Oddly I don't see it in the Kafka server settings, so guess it's an edenhill driver setting only.