Trying to read the batch messages for CloudEvents from cloud stream with Kafka binder. If I use any custom class with custom serializer/deserializer it is working fine but with cloudevents the messages are not coming.
spring
cloud:
function.definition: consumer
stream:
bindings:
producer-out-0:
destination: audit
group: audit-producer
producer:
useNativeEncoding: true
consumer-in-0:
destination: audit
group: audit-consumer
consumer:
batch-mode: true
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
consumer-properties:
max.poll.records: 5
fetch.min.bytes: 10000
fetch.max.wait.ms: 10000
bindings:
producer-out-0:
producer:
configuration:
cloudevents:
serializer:
encoding: STRUCTURED
event_format: application/cloudevents json
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
value.serializer: io.cloudevents.kafka.CloudEventSerializer
consumer-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
value.deserializer: io.cloudevents.kafka.CloudEventDeserializer
And Consumer I tried with both List<?> and with single
@Bean
public Consumer<List<CloudEvent>> consumer() {
System.out.println("inside consumer");
//return auditMessage -> System.out.println("data at loop--" thread " -- " auditMessage);
return s -> s.forEach(auditMessage -> System.out.println("data at loop--" thread " -- " auditMessage));
}
If I just use Consumer I am getting the following error, it means the deserialization is happening but somehow message is not coming to consumer.
2022-10-26 20:31:24.070 WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]
CodePudding user response:
It appears you are using Cloud Events SDK, not sure what value you are getting from it as you can do everything (and more) without it. here are the two posts where I discuss it:
https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1 https://spring.io/blog/2020/12/23/cloud-events-and-spring-part-2
IN any event if you still want to rely on SDK types like CloudEvent you may be missing dependency:
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>2.3.0</version>
</dependency>
Also we have SDK-based sample, so that may help. https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml
Otherwise the best thing you can do is create a small app that reproduces the issue, push it to github and send post a link so we can tale a look.