Home > Back-end >  Spring Cloud Stream connect to multiple hosts for single binder (RabbitMQ)
Spring Cloud Stream connect to multiple hosts for single binder (RabbitMQ)

Time:02-23

we are using Spring Cloud Stream to listen to rabbitMQ multiple queues, especially the SCF model

  • The spring-cloud-stream-reactive module is deprecated in favor of native support via Spring Cloud Function programming model.

by the time there was a single node/host it was working good (application.yml snippet shared below),

however the moment we try to connect multiple nodes it is failing, Can someone guide how to connect the same or have some sample related to Spring Cloud Documentation

Following Code is working as expected

spring:
  cloud:
    stream:
      function:
        definition: function1;function2;function3
      bindings:
        function1-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
        function2-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
        function3-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit
      rabbit:
        bindings:
          function1-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          function2-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          function3-in-0:
            consumer:
              bindingRoutingKey: routing.key.3
        binder:
          nodes: address1

Basically it need to be something like following

spring:
  cloud:
    stream:
      function:
        definition: function1;function2;function3
      bindings:
        function1-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit1
        function2-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit2
        function3-in-0:
          group: allocation
          destination: destinationExchange
          binder: rabbit3
      binder:
        rabbit1:
          function1-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          binder:
            nodes: address1
        rabbit2:
          function2-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          binder:
            nodes: address2
        rabbit3:
          function3-in-0:
            consumer:
              bindingRoutingKey: routing.key.3
          binder:
            nodes: address3

with following addition itself

binders:
    rabbit1:
      type: rabbit
      environment:
        spring.spring.cloud.stream.kafka:
          binder:
            nodes: localhost

i am getting this error

o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: rabbit
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.8.jar:5.3.8]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.2.jar:2.5.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332) ~[spring-boot-2.5.2.jar:2.5.2]
    at com.gap.pem.Application.main(Application.java:14) ~[main/:na]
Caused by: java.lang.IllegalStateException: Unknown binder configuration: rabbit
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.8.jar:5.3.8]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:255) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:224) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:152) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:386) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:103) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.8.jar:5.3.8]
    ... 14 common frames omitted


Process finished with exit code 1

we have following dependencies available

implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'

CodePudding user response:

Upon adding the binders config for both rabbit1 and rabbit2 it resolved the issue:

Below is the sample config which I tried and was able to consume messages successfully

 spring:
  cloud:
    stream:
      function:
        definition: processFirstConsumer;processSecondConsumer
      bindings:
        processFirstConsumer-in-0:
          group: allocation
          destination: userMessage1
          binder: rabbit1
        processSecondConsumer-in-0:
          group: allocation
          destination: userMessage2
          binder: rabbit2
      binder:
        rabbit1:
          processFirstConsumer-in-0:
            consumer:
              bindingRoutingKey: routing.key.1
          binder:
            nodes: address1
        rabbit2:
          processSecondConsumer-in-0:
            consumer:
              bindingRoutingKey: routing.key.2
          binder:
            nodes: address2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
  • Related