Home > OS >  Batch Consumer not working with Kafka for CloudEvents using Spring Cloud Stream
Batch Consumer not working with Kafka for CloudEvents using Spring Cloud Stream

Time:10-27

Trying to read the batch messages for CloudEvents from cloud stream with Kafka binder. If I use any custom class with custom serializer/deserializer it is working fine but with cloudevents the messages are not coming.

spring
  cloud:
    function.definition: consumer
    stream:
      bindings:
        producer-out-0:
          destination: audit
          group: audit-producer
          producer:
            useNativeEncoding: true
        consumer-in-0:
          destination: audit
          group: audit-consumer
          consumer:
            batch-mode: true
            useNativeDecoding: true
      kafka:
        binder:
          brokers: localhost:9092
          consumer-properties:
            max.poll.records: 5
            fetch.min.bytes: 10000
            fetch.max.wait.ms: 10000
        bindings:
          producer-out-0:
            producer:
              configuration:
                cloudevents:
                  serializer:
                    encoding: STRUCTURED
                    event_format: application/cloudevents json
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
#                value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
                value.serializer: io.cloudevents.kafka.CloudEventSerializer
          consumer-in-0:
            consumer:
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
#                value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
                value.deserializer: io.cloudevents.kafka.CloudEventDeserializer

And Consumer I tried with both List<?> and with single

@Bean
public Consumer<List<CloudEvent>> consumer() {
  System.out.println("inside consumer");
  //return auditMessage -> System.out.println("data at loop--"   thread   " -- "   auditMessage);
  return s -> s.forEach(auditMessage -> System.out.println("data at loop--"   thread   " -- "   auditMessage));
}

If I just use Consumer I am getting the following error, it means the deserialization is happening but somehow message is not coming to consumer.

2022-10-26 20:31:24.070  WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]

CodePudding user response:

It appears you are using Cloud Events SDK, not sure what value you are getting from it as you can do everything (and more) without it. here are the two posts where I discuss it:

https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1 https://spring.io/blog/2020/12/23/cloud-events-and-spring-part-2

IN any event if you still want to rely on SDK types like CloudEvent you may be missing dependency:

<dependency>
   <groupId>io.cloudevents</groupId>
   <artifactId>cloudevents-spring</artifactId>
   <version>2.3.0</version>
</dependency>

Also we have SDK-based sample, so that may help. https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml

Otherwise the best thing you can do is create a small app that reproduces the issue, push it to github and send post a link so we can tale a look.

  • Related