High level overview: different types of events are published to the same kinesis stream: OrderCreated, OrderUpdated, CustomerCreated, etc.
Those messages are to be consumed by a Spring Cloud Stream application by writing some Consumer functions and exposing them as beans (Functional style).
I could expose a message handler as java.util.function.Consumer to process all those messages in the scenario the message format was similar to the following 'generic' one:
Message {
String type;
Object payload;
}
However, I want to know if I can have different message handlers/consumers to process each type of event from the same stream name, something like this:
@Bean
Consumer<OrderCreated> processOrderCreatedEvent(SomeReceiver receiver) {
return receiver::receive;
}
@Bean
Consumer<OrderCreated> processCustomerCreatedEvent(SomeOtherReceiver receiver) {
return receiver::receive;
}
cloud:
function:
definition: processOrderCreatedEvent;processCustomerCreatedEvent
stream:
bindings:
processOrderCreatedEvent-in-0:
destination: 'events-stream-name'
processCustomerCreatedEvent-in-0:
destination: 'events-stream-name'
I have not been able to make this work, so not sure if this is possible.
CodePudding user response:
I think you are looking for Event Routing feature - https://docs.spring.io/spring-cloud-stream/docs/3.2.5-SNAPSHOT/reference/html/spring-cloud-stream.html#_event_routing
You can have individual functions for each event type and than have routing function consume everything and distribute between the functions based on your routing instructions.
CodePudding user response:
I was able to make it work by providing a spring.cloud.function.routing-expression
as in the Event Routing documentation link Oleg shared, however, I had to change the destination from 'events-stream-name' to 'functionRouter-in-0', something like this:
spring:
cloud:
function:
routing-expression: "T(java.lang.System).nanoTime() % 2 == 0 ? 'processOrderCreatedEvent' : 'processCustomerCreatedEvent'"
stream:
function:
routing:
enabled: true
bindings:
processOrderCreatedEvent-in-0:
destination: 'functionRouter-in-0'
content-type: application/json
processCustomerCreatedEvent-in-0:
destination: 'functionRouter-in-0'
content-type: application/json
If the events are not published/consumed to/from 'functionRouter-in-0', routing is not working.
Obviously, 'functionRouter-in-0' is not the destination where the publisher application will put the events, my question is, do I need another function which takes the events from 'events-stream-name' and sends them to 'functionRouter-in-0'? Is this the right pattern?
For the sake of testing, I am producing the events with StreamBridge to 'functionRouter-in-0'.