Consumer factory has the following Settings - Max. Poll. Records=10000, fetch. Min. Bytes=100000, fetch. Max. Wait.=1000, ms. The session timeout. Ms=240000.
Update 06/04 which is consumer factory Settings. It is Spring - Kafka - 1.3.1. The Confluent Kafka agent is version
@ Bean
Public ConsumerFactory
Map
Props. The put (ConsumerConfig BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Props. The put (ConsumerConfig KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. Class);
Props. The put (ConsumerConfig VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer. Class);
Props. The put (ConsumerConfig GROUP_ID_CONFIG, CONSUMER_GROUP);
Props. The put (ConsumerConfig MAX_POLL_RECORDS_CONFIG, 10000);
Props. The put (ConsumerConfig FETCH_MIN_BYTES_CONFIG, 100000);
Props. The put (ConsumerConfig FETCH_MAX_WAIT_MS_CONFIG, 1000);
Props. The put (ConsumerConfig SESSION_TIMEOUT_MS_CONFIG, 240000);
Return new DefaultKafkaConsumerFactory<> (props, new StringDeserializer (),
New JsonDeserializer<> (ListingMessage. Class));
}
@ Bean (KAFKA_LISTENER_CONTAINER_FACTORY) @autowired
Public concurrentKafkaListenerContainerFactory
ConsumerFactory
ConcurrentKafkaListenerContainerFactory
New ConcurrentKafkaListenerContainerFactory<> (a);
Factory. SetConsumerFactory (listingConsumerFactory);
Factory. SetConcurrency (1);
Factory. SetAutoStartup (false);
Factory. SetBatchListener (true);
return factory;
}
Note: the Container Factory started automatically set to false. This is when loading large files manually start/stop users.
After running about 1 hours (time), even if the subject has a lot of news available, consumers will also cease to use the theme of news. There is a log statements in consume method, is used to stop in the log printing.
I use the "./kafka - consumer - groups "command tracking status of consumers, and after a period of time to see no consumers in this group.
$./kafka - consumer groups - the bootstrap - server localhost: 9092 - go - group group_name
The consumer failure no errors in the log. The user method included in a try-catch block, so it will capture throw any exceptions during the process the message.
How we design the Spring - Kafka consumers, so that when consumers stop spending to restart the consumers? When consumers to stop, if there is a listener can record the exact point? This is because the concurrency is set to 1? I must put the concurrency is set to 1, if the consumers with more concurrency, so other customers will slow down.
CodePudding user response:
I just run for 30 seconds Max. Poll. Interval. Ms=30000 test, suspend the audience, 30 seconds later restored; I saw this in the log...The 18:35:59 2018-06-04. 4191-361 the INFO [foo - 0 - C - 1] O.S.K.L.K afkaMessageListenerContainer: partitions assigned: [so50687794-0]
Foo
The 18:37:07 2018-06-04. 4191-347 ERROR [foo - 0 - C - 1] O.S.K afka. Listener. LoggingErrorHandler: ERROR while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
The at org. Apache. Kafka. Clients. Consumer. Internals. ConsumerCoordinator. SendOffsetCommitRequest (ConsumerCoordinator. Java: 722) ~ [kafka - clients - 1.0.1. Jar: na]
At org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.com mitOffsetsSync ConsumerCoordinator. Java: (600) ~ [kafka - clients - 1.0.1. Jar: na]
At org.apache.kafka.clients.consumer.KafkaConsumer.com mitSync KafkaConsumer. Java: (1250) ~ [kafka - clients - 1.0.1. Jar: na]
At org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.com mitIfNecessary KafkaMessageListenerContainer. Java: (1329) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at org. Springframework. Kafka. Listener. KafkaMessageListenerContainer $ListenerConsumer. ProcessCommits (KafkaMessageListenerContainer. Java: 1190) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at org. Springframework. Kafka. Listener. KafkaMessageListenerContainer $ListenerConsumer. Run (KafkaMessageListenerContainer. Java: 688) ~ [spring - kafka - 2.1.6. The jar: 2.1.6. RELEASE]
The at Java. Util. Concurrent. Executors $RunnableAdapter. Call (511) Executors. Java: [na: 1.8.0 comes with _131]
The at Java. Util. Concurrent. FutureTask. Run (FutureTask. Java: 266) [na: 1.8.0 comes with _131]
The at Java. Lang. Thread. The run (Thread. Java: 748) [na: 1.8.0 comes with _131]
The 18:37:07 2018-06-04. 4191-350 the INFO [foo - 0 - C - 1] O.A.K.C.C.I nternals. ConsumerCoordinator: [Consumer clientId=Consumer - 1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
The 18:37:07 2018-06-04. 4191-351 the INFO [foo - 0 - C - 1] O.S.K.L.K afkaMessageListenerContainer: partitions revoked: [so50687794-0]
nullnullnullnullnullnullnullnullnullnullnullnullnull