Home > other >  Java - Spring Boot KafkaListener in operation after a period of time to stop using the message
Java - Spring Boot KafkaListener in operation after a period of time to stop using the message

Time:10-13

I have a Spring Boot program, the Confluent Kakfa theme running on several Kafka users (@ KafkaListener), there are 8 partitions. Each consumer's concurrency is set to 1. The theme of loaded from files and documents about one million lines of news. Consumers use them to verify the quantities, the processing and update the database.
Consumer factory has the following Settings - Max. Poll. Records=10000, fetch. Min. Bytes=100000, fetch. Max. Wait.=1000, ms. The session timeout. Ms=240000.
Update 06/04 which is consumer factory Settings. It is Spring - Kafka - 1.3.1. The Confluent Kafka agent is version

@ Bean
Public ConsumerFactory ConsumerFactory () {
Map Props=new HashMap<> (a);
Props. The put (ConsumerConfig BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Props. The put (ConsumerConfig KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. Class);
Props. The put (ConsumerConfig VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer. Class);
Props. The put (ConsumerConfig GROUP_ID_CONFIG, CONSUMER_GROUP);
Props. The put (ConsumerConfig MAX_POLL_RECORDS_CONFIG, 10000);
Props. The put (ConsumerConfig FETCH_MIN_BYTES_CONFIG, 100000);
Props. The put (ConsumerConfig FETCH_MAX_WAIT_MS_CONFIG, 1000);
Props. The put (ConsumerConfig SESSION_TIMEOUT_MS_CONFIG, 240000);

Return new DefaultKafkaConsumerFactory<> (props, new StringDeserializer (),
New JsonDeserializer<> (ListingMessage. Class));
}

@ Bean (KAFKA_LISTENER_CONTAINER_FACTORY) @autowired
Public concurrentKafkaListenerContainerFactory ListingKafkaListenerContainerFactory (
ConsumerFactory ConsumerFactory) {
ConcurrentKafkaListenerContainerFactory The factory=
New ConcurrentKafkaListenerContainerFactory<> (a);
Factory. SetConsumerFactory (listingConsumerFactory);
Factory. SetConcurrency (1);
Factory. SetAutoStartup (false);
Factory. SetBatchListener (true);
return factory;
}

Note: the Container Factory started automatically set to false. This is when loading large files manually start/stop users.
After running about 1 hours (time), even if the subject has a lot of news available, consumers will also cease to use the theme of news. There is a log statements in consume method, is used to stop in the log printing.
I use the "./kafka - consumer - groups "command tracking status of consumers, and after a period of time to see no consumers in this group.

$./kafka - consumer groups - the bootstrap - server localhost: 9092 - go - group group_name

The consumer failure no errors in the log. The user method included in a try-catch block, so it will capture throw any exceptions during the process the message.
How we design the Spring - Kafka consumers, so that when consumers stop spending to restart the consumers? When consumers to stop, if there is a listener can record the exact point? This is because the concurrency is set to 1? I must put the concurrency is set to 1, if the consumers with more concurrency, so other customers will slow down.

CodePudding user response:

I just run for 30 seconds Max. Poll. Interval. Ms=30000 test, suspend the audience, 30 seconds later restored; I saw this in the log...

The 18:35:59 2018-06-04. 4191-361 the INFO [foo - 0 - C - 1] O.S.K.L.K afkaMessageListenerContainer: partitions assigned: [so50687794-0]
Foo

The 18:37:07 2018-06-04. 4191-347 ERROR [foo - 0 - C - 1] O.S.K afka. Listener. LoggingErrorHandler: ERROR while processing: null

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
The at org. Apache. Kafka. Clients. Consumer. Internals. ConsumerCoordinator. SendOffsetCommitRequest (ConsumerCoordinator. Java: 722) ~ [kafka - clients - 1.0.1. Jar: na]
At org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.com mitOffsetsSync ConsumerCoordinator. Java: (600) ~ [kafka - clients - 1.0.1. Jar: na]
At org.apache.kafka.clients.consumer.KafkaConsumer.com mitSync KafkaConsumer. Java: (1250) ~ [kafka - clients - 1.0.1. Jar: na]
At org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.com mitIfNecessary KafkaMessageListenerContainer. Java: (1329) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at org. Springframework. Kafka. Listener. KafkaMessageListenerContainer $ListenerConsumer. ProcessCommits (KafkaMessageListenerContainer. Java: 1190) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at org. Springframework. Kafka. Listener. KafkaMessageListenerContainer $ListenerConsumer. Run (KafkaMessageListenerContainer. Java: 688) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at Java. Util. Concurrent. Executors $RunnableAdapter. Call (511) Executors. Java: [na: 1.8.0 comes with _131]
The at Java. Util. Concurrent. FutureTask. Run (FutureTask. Java: 266) [na: 1.8.0 comes with _131]
The at Java. Lang. Thread. The run (Thread. Java: 748) [na: 1.8.0 comes with _131]

The 18:37:07 2018-06-04. 4191-350 the INFO [foo - 0 - C - 1] O.A.K.C.C.I nternals. ConsumerCoordinator: [Consumer clientId=Consumer - 1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
The 18:37:07 2018-06-04. 4191-351 the INFO [foo - 0 - C - 1] O.S.K.L.K afkaMessageListenerContainer: partitions revoked: [so50687794-0]
nullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related