Home > Enterprise >  Spring Cloud Kafka Consumer Properties not working
Spring Cloud Kafka Consumer Properties not working

Time:11-17

I have the following application.yml file:

spring:
  application:
    name: test-app
  cloud:
    stream:
      kafka:
        binder:
          consumerProperties:
            value:
              deserializer: org.apache.kafka.common.serialization.StringDeserializer
          brokers: localhost:9092
          autoCreateTopics: true
          replicationFactor: 1
      bindings:
        listenCloudEvent-in-0:
          destination: com.test.cloudevent
          group: test-app-group
          consumer:
            configuration:
              value:
                deserializer: io.cloudevents.kafka.CloudEventDeserializer
        listenString-in-0:
          destination: com.test.string
          group:  test-app-group
    function:
      definition: listenCloudEvent;listenString

As far as I know, following properties should overrides the default deserializer but it seems it doesnt work.

  consumer:
        configuration:
          value:
            deserializer: io.cloudevents.kafka.CloudEventDeserializer

I am getting the following error for com.test.couldevent topic.

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of io.cloudevents.CloudEvent (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

If I change the default deserializer to event cloud deserializer, then it works but then the other listener brakes.

Here is my listener function:

@Bean
public Consumer<Flux<Message<CloudEvent>>> listenCloudEvent() {
    return inboundMessage -> inboundMessage
            .onErrorStop()
            .doOnNext(message -> log.info("[{}] Message is received.", message.getPayload().getId()))
            .subscribe();
}

I really appreciate if you could help me on this issue. My main aim is having different deserializer for different topics. However, I wasnt able to succeed on it although it is indicated this way to override the default deserializer in spring documentation.

Thank you in advance!

CodePudding user response:

You are missing the kafka. element in the consumer properties. configuration is a kafka-specific property, you are setting a common consumer property.

See https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties

The following properties are available for Kafka consumers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer..

Note ...stream.kafka.bindings..., not ...stream.bindings....

  • Related