we are using Spring Cloud Stream to listen to rabbitMQ multiple queues, especially the SCF model
- The spring-cloud-stream-reactive module is deprecated in favor of native support via Spring Cloud Function programming model.
by the time there was a single node/host it was working good (application.yml snippet shared below),
however the moment we try to connect multiple nodes it is failing, Can someone guide how to connect the same or have some sample related to Spring Cloud Documentation
Following Code is working as expected
spring:
cloud:
stream:
function:
definition: function1;function2;function3
bindings:
function1-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
function2-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
function3-in-0:
group: allocation
destination: destinationExchange
binder: rabbit
rabbit:
bindings:
function1-in-0:
consumer:
bindingRoutingKey: routing.key.1
function2-in-0:
consumer:
bindingRoutingKey: routing.key.2
function3-in-0:
consumer:
bindingRoutingKey: routing.key.3
binder:
nodes: address1
Basically it need to be something like following
spring:
cloud:
stream:
function:
definition: function1;function2;function3
bindings:
function1-in-0:
group: allocation
destination: destinationExchange
binder: rabbit1
function2-in-0:
group: allocation
destination: destinationExchange
binder: rabbit2
function3-in-0:
group: allocation
destination: destinationExchange
binder: rabbit3
binder:
rabbit1:
function1-in-0:
consumer:
bindingRoutingKey: routing.key.1
binder:
nodes: address1
rabbit2:
function2-in-0:
consumer:
bindingRoutingKey: routing.key.2
binder:
nodes: address2
rabbit3:
function3-in-0:
consumer:
bindingRoutingKey: routing.key.3
binder:
nodes: address3
with following addition itself
binders:
rabbit1:
type: rabbit
environment:
spring.spring.cloud.stream.kafka:
binder:
nodes: localhost
i am getting this error
o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: rabbit
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.8.jar:5.3.8]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.2.jar:2.5.2]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332) ~[spring-boot-2.5.2.jar:2.5.2]
at com.gap.pem.Application.main(Application.java:14) ~[main/:na]
Caused by: java.lang.IllegalStateException: Unknown binder configuration: rabbit
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.8.jar:5.3.8]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:255) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:224) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:152) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:386) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:103) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.8.jar:5.3.8]
... 14 common frames omitted
Process finished with exit code 1
we have following dependencies available
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
CodePudding user response:
Upon adding the binders config for both rabbit1 and rabbit2 it resolved the issue:
Below is the sample config which I tried and was able to consume messages successfully
spring:
cloud:
stream:
function:
definition: processFirstConsumer;processSecondConsumer
bindings:
processFirstConsumer-in-0:
group: allocation
destination: userMessage1
binder: rabbit1
processSecondConsumer-in-0:
group: allocation
destination: userMessage2
binder: rabbit2
binder:
rabbit1:
processFirstConsumer-in-0:
consumer:
bindingRoutingKey: routing.key.1
binder:
nodes: address1
rabbit2:
processSecondConsumer-in-0:
consumer:
bindingRoutingKey: routing.key.2
binder:
nodes: address2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /