Home > OS >  How to configure Kafka consumer retry property from application.properties in spring boot?
How to configure Kafka consumer retry property from application.properties in spring boot?

Time:12-04

In spring-boot,

application.yml:

kafka:
   bootstrap-servers: localhost:9092
   listener:
    concurrency: 10
    ack-mode: MANUAL
   producer:
    topic: test-record
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    retries: 3
    orn-record:
      timeout: 3
    #acks: 1
   consumer:
    groupId: test-record
    topic: test
    enable-auto-commit: false
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

By using above configuration, We can avoid java web(bean) based configuration in spring boot and it's a high worthy advantage.

Q: Can we add kafka error handler and kafka consumer number of retry properties from application.properties / application.yml ?

I could not find any reference or documentation about it hence hoping some conclusion, just because of this issue now I have to go to java web based configuration in spring boot and remove the properties configuration which is again going back to old way in spring. I believe there should be some workaround and we could achieve this through property file configuration.

CodePudding user response:

Consumers don't have a retry property. If the offsets were not committed, the next poll will try again from the same offsets.

There is also not any configurable error handling class that is out-of-band from deserialization like there is in Kafka Streams.

If you want to handle deserialization errors, and not processing errors, you can set that up like so

spring:
  kafka:
    bootstrap-servers: ...
    consumer:
      # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    properties:
      # Delegate deserializers
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

Beyond that, you can leverage DeadLetterPublishingRecoverer and SeekToCurrentErrorHandler in the code to produce the error events to a new topic for inspection and further processing. - Source

CodePudding user response:

How about followings?

server:
  port: 8081

spring:
  kafka:
    # Kafka server informations
    bootstrap-servers:
    - 172.17.19.49:9093
    - 172.17.19.60:9093
    - 172.17.19.63:9093
    listener:
      # Number of threads to run in the listener containers.
      # The default partition distribution may not be as expected when listening to multiple topics.
      # For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle.
      # This is because the default Kafka PartitionAssignor is the RangeAssignor.
      # For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers.
      # Then, each consumer is assigned one topic or partition.
      # (Ref. [strategy]) 
      concurrency: 2
      # Offset commit method
      # - RECORD: commit offset after processing each record
      # - BATCH: commit offsets after processing all records
      # - TIME: commit pending offsets after [ack-time] has elapsed
      # - COUNT: commit pending offsets after [ack-count] has been exceeded
      # - COUNT_TIME: commit pending offsets after [ack-time] has elapsed or [ack-count] has been exceeded
      # - MANUAL: (for aggregating multiple replies) The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as [BATCH] are applied.
      # - MANUAL_IMMEDIATE: (for aggregating multiple replies) Commit the offset immediately when the "Acknowledgment.acknowledge()" method is called by the listener.
      ack-mode: MANUAL_IMMEDIATE
      # Timeout to poll(fetch)
      # (Default value: 5000)
      poll-timeout: 5000
      
    # Use [properties] when authentication is required for connection.
    # If authentication is not required, delete below [properties].
    properties:
      sasl:
        # Use instead of [.jaas] file
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
        mechanism: SCRAM-SHA-512
      security:
        protocol: SASL_PLAINTEXT

    consumer:
      # Group id of the consumer client.
      # It can also be used directly by using the @KafkaListener annotation option.
      # ex) @KafkaListener(topics = "TOPIC", groupId = "GROUP_ID")
      group-id: test-consumer
      # client id
      client-id: test-consumer-1
      # When set to true, Consumer's offset is periodically committed in the background
      # - true
      # - false
      enable-auto-commit: true
      # (Optinal) Frequency at which consumer offsets are automatically committed to kafka when [enable-auto-commit] is true
      auto-commit-interval: 100
      # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
      # (When offset information is lost, offeset reset method)
      # - earliest: reset offeset to oldest message
      # - latest: reset offeset to the most recently generated message
      # - none: throw exception to consumer if no previous offset is found for the consumer's group
      # - anything else: throw exception to the consumer
      auto-offset-reset: latest
      # The minimum amount of data the server should return for a fetch request.
      # If insufficient data is available the request will wait for that much data to accumulate before answering the request.
      # If set to 1 byte, it fetch as soon as the fetch request time-out.
      # (Unit: bytes, Default value: 1)
      fetch-min-size: 1024
      # The maximum amount of data the server should return for a fetch request.
      # If there is not enough data size to be satisfied in [fetch-min-size], the server blocks the fetch request for the time set in [fetch-max-wait]
      # (Unit: milliseconds, Default value: 500)
      fetch-max-wait: 500
      # Maximum number of records returned in a single call
      # (Default value: 500)
      max-poll-records: 10
      # key/value deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        partition:
          assignment:
            # Kafka Consumer built-in strategies.
            # - org.apache.kafka.clients.consumer.RangeAssignor: (default strategy)
            # - org.apache.kafka.clients.consumer.RoundRobinAssignor
            # - org.apache.kafka.clients.consumer.StickyAssignor
            # (Ref. https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3)
            strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor

  • Related