Home > other >  Spring Cloud Stream multiple consumers for multiple event types in same kinesis stream
Spring Cloud Stream multiple consumers for multiple event types in same kinesis stream

Time:10-02

High level overview: different types of events are published to the same kinesis stream: OrderCreated, OrderUpdated, CustomerCreated, etc.

Those messages are to be consumed by a Spring Cloud Stream application by writing some Consumer functions and exposing them as beans (Functional style).

I could expose a message handler as java.util.function.Consumer to process all those messages in the scenario the message format was similar to the following 'generic' one:

Message {
    String type;
    Object payload;
}

However, I want to know if I can have different message handlers/consumers to process each type of event from the same stream name, something like this:

@Bean
Consumer<OrderCreated> processOrderCreatedEvent(SomeReceiver receiver) {
    return receiver::receive;
}

@Bean
Consumer<OrderCreated> processCustomerCreatedEvent(SomeOtherReceiver receiver) {
    return receiver::receive;
}

  cloud:
    function:
      definition: processOrderCreatedEvent;processCustomerCreatedEvent
    stream:
      bindings:
        processOrderCreatedEvent-in-0:
          destination: 'events-stream-name'
        processCustomerCreatedEvent-in-0:
          destination: 'events-stream-name'

I have not been able to make this work, so not sure if this is possible.

CodePudding user response:

I think you are looking for Event Routing feature - https://docs.spring.io/spring-cloud-stream/docs/3.2.5-SNAPSHOT/reference/html/spring-cloud-stream.html#_event_routing

You can have individual functions for each event type and than have routing function consume everything and distribute between the functions based on your routing instructions.

CodePudding user response:

I was able to make it work by providing a spring.cloud.function.routing-expression as in the Event Routing documentation link Oleg shared, however, I had to change the destination from 'events-stream-name' to 'functionRouter-in-0', something like this:

spring:
  cloud:
    function:
      routing-expression: "T(java.lang.System).nanoTime() % 2 == 0 ? 'processOrderCreatedEvent' : 'processCustomerCreatedEvent'"
    stream:
      function:
        routing:
          enabled: true
      bindings:
        processOrderCreatedEvent-in-0:
          destination: 'functionRouter-in-0'
          content-type: application/json
        processCustomerCreatedEvent-in-0:
          destination: 'functionRouter-in-0'
          content-type: application/json

If the events are not published/consumed to/from 'functionRouter-in-0', routing is not working.

Obviously, 'functionRouter-in-0' is not the destination where the publisher application will put the events, my question is, do I need another function which takes the events from 'events-stream-name' and sends them to 'functionRouter-in-0'? Is this the right pattern?

For the sake of testing, I am producing the events with StreamBridge to 'functionRouter-in-0'.

  • Related