In spring-boot,
application.yml:
kafka:
bootstrap-servers: localhost:9092
listener:
concurrency: 10
ack-mode: MANUAL
producer:
topic: test-record
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
orn-record:
timeout: 3
#acks: 1
consumer:
groupId: test-record
topic: test
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
By using above configuration, We can avoid java web(bean) based configuration
in spring boot
and it's a high worthy advantage.
Q: Can we add kafka error handler and kafka consumer number of retry properties from application.properties / application.yml ?
I could not find any reference or documentation about it hence hoping some conclusion, just because of this issue now I have to go to java web based configuration
in spring boot and remove the properties configuration which is again going back to old way in spring
. I believe there should be some workaround and we could achieve this through property file configuration
.
CodePudding user response:
Consumers don't have a retry property. If the offsets were not committed, the next poll will try again from the same offsets.
There is also not any configurable error handling class that is out-of-band from deserialization like there is in Kafka Streams.
If you want to handle deserialization errors, and not processing errors, you can set that up like so
spring:
kafka:
bootstrap-servers: ...
consumer:
# Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
# Delegate deserializers
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
Beyond that, you can leverage DeadLetterPublishingRecoverer
and SeekToCurrentErrorHandler
in the code to produce the error events to a new topic for inspection and further processing. - Source
CodePudding user response:
How about followings?
server:
port: 8081
spring:
kafka:
# Kafka server informations
bootstrap-servers:
- 172.17.19.49:9093
- 172.17.19.60:9093
- 172.17.19.63:9093
listener:
# Number of threads to run in the listener containers.
# The default partition distribution may not be as expected when listening to multiple topics.
# For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle.
# This is because the default Kafka PartitionAssignor is the RangeAssignor.
# For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers.
# Then, each consumer is assigned one topic or partition.
# (Ref. [strategy])
concurrency: 2
# Offset commit method
# - RECORD: commit offset after processing each record
# - BATCH: commit offsets after processing all records
# - TIME: commit pending offsets after [ack-time] has elapsed
# - COUNT: commit pending offsets after [ack-count] has been exceeded
# - COUNT_TIME: commit pending offsets after [ack-time] has elapsed or [ack-count] has been exceeded
# - MANUAL: (for aggregating multiple replies) The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as [BATCH] are applied.
# - MANUAL_IMMEDIATE: (for aggregating multiple replies) Commit the offset immediately when the "Acknowledgment.acknowledge()" method is called by the listener.
ack-mode: MANUAL_IMMEDIATE
# Timeout to poll(fetch)
# (Default value: 5000)
poll-timeout: 5000
# Use [properties] when authentication is required for connection.
# If authentication is not required, delete below [properties].
properties:
sasl:
# Use instead of [.jaas] file
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
mechanism: SCRAM-SHA-512
security:
protocol: SASL_PLAINTEXT
consumer:
# Group id of the consumer client.
# It can also be used directly by using the @KafkaListener annotation option.
# ex) @KafkaListener(topics = "TOPIC", groupId = "GROUP_ID")
group-id: test-consumer
# client id
client-id: test-consumer-1
# When set to true, Consumer's offset is periodically committed in the background
# - true
# - false
enable-auto-commit: true
# (Optinal) Frequency at which consumer offsets are automatically committed to kafka when [enable-auto-commit] is true
auto-commit-interval: 100
# What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
# (When offset information is lost, offeset reset method)
# - earliest: reset offeset to oldest message
# - latest: reset offeset to the most recently generated message
# - none: throw exception to consumer if no previous offset is found for the consumer's group
# - anything else: throw exception to the consumer
auto-offset-reset: latest
# The minimum amount of data the server should return for a fetch request.
# If insufficient data is available the request will wait for that much data to accumulate before answering the request.
# If set to 1 byte, it fetch as soon as the fetch request time-out.
# (Unit: bytes, Default value: 1)
fetch-min-size: 1024
# The maximum amount of data the server should return for a fetch request.
# If there is not enough data size to be satisfied in [fetch-min-size], the server blocks the fetch request for the time set in [fetch-max-wait]
# (Unit: milliseconds, Default value: 500)
fetch-max-wait: 500
# Maximum number of records returned in a single call
# (Default value: 500)
max-poll-records: 10
# key/value deserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
partition:
assignment:
# Kafka Consumer built-in strategies.
# - org.apache.kafka.clients.consumer.RangeAssignor: (default strategy)
# - org.apache.kafka.clients.consumer.RoundRobinAssignor
# - org.apache.kafka.clients.consumer.StickyAssignor
# (Ref. https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3)
strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor