I have the following application.yml file:
spring:
application:
name: test-app
cloud:
stream:
kafka:
binder:
consumerProperties:
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: localhost:9092
autoCreateTopics: true
replicationFactor: 1
bindings:
listenCloudEvent-in-0:
destination: com.test.cloudevent
group: test-app-group
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
listenString-in-0:
destination: com.test.string
group: test-app-group
function:
definition: listenCloudEvent;listenString
As far as I know, following properties should overrides the default deserializer but it seems it doesnt work.
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
I am getting the following error for com.test.couldevent
topic.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of
io.cloudevents.CloudEvent
(no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
If I change the default deserializer to event cloud deserializer, then it works but then the other listener brakes.
Here is my listener function:
@Bean
public Consumer<Flux<Message<CloudEvent>>> listenCloudEvent() {
return inboundMessage -> inboundMessage
.onErrorStop()
.doOnNext(message -> log.info("[{}] Message is received.", message.getPayload().getId()))
.subscribe();
}
I really appreciate if you could help me on this issue. My main aim is having different deserializer for different topics. However, I wasnt able to succeed on it although it is indicated this way to override the default deserializer in spring documentation.
Thank you in advance!
CodePudding user response:
You are missing the kafka.
element in the consumer properties. configuration
is a kafka-specific property, you are setting a common consumer property.
The following properties are available for Kafka consumers only and must be prefixed with
spring.cloud.stream.kafka.bindings.<channelName>.consumer..
Note ...stream.kafka.bindings...
, not ...stream.bindings...
.