Home > OS >  incorrectly invoked stream listeners on rabbitmq message consumer with spring-cloud-stream
incorrectly invoked stream listeners on rabbitmq message consumer with spring-cloud-stream

Time:09-28

I have a project that is spring-cloud-starter-parent:Hoxton.SR9. the main objective here is messaging, thus there are message (kafka/rabbit) producers and consumers declared:

spring:
  cloud:
    stream:
      default-binder: kafka
      contentType: application/* avro
      bindings:
        loanContractWaitingForActivationInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        loanContractComponentsChangedInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        loanOfferPresentedInput:
          binder: rabbit
          destination: icebank
          group: LH_LoanEventKeeper_testing2
          contentType: application/json
        postingEntryRequestInput:
          destination: PAY_PaymentServices_PostingEntryRequest
          group: LH_LoanEventKeeper
          contentType: application/* avro
          consumer:
            maxAttempts: 1
            autoCommitOnError: true
            headerMode: embeddedHeaders

      kafka:
        bindings:
          postingEntryRequestInput:
            consumer:
              autoCommitOnError: true
        binder:
          configuration:
            security:
              protocol: ${KAFKA_SECURITY_PROTOCOL:SSL}
            ssl:
              truststore:
                location: path
                password: ****
                type: JKS

      rabbit:
        binder:
          useSSL: ${AMQP_SSL_ENABLED:true}
        bindings:
          loanContractWaitingForActivationInput:
            consumer:
              bindingRoutingKey: loans.contracts.waitingForActivation
              exchange-auto-delete: false
          loanContractComponentsChangedInput:
            consumer:
              bindingRoutingKey: loans.contracts.components
              exchange-auto-delete: false
          loanOfferPresentedInput:
            consumer:
              bindingRoutingKey: loans.offers.presented
              exchange-auto-delete: false

alongside with typed @StreamListeners provided:

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_WAITING_FOR_ACTIVATION_INPUT)
    void receiveLoanContractWaitingForActivation(@Payload LoanContractWaitingForActivationRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_COMPONENTS_CHANGED_INPUT)
    void receiveLoanContractComponentsChanged(@Payload LoanContractComponentsChangedRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsRabbit.LOAN_OFFER_PRESENTED_INPUT)
    void receiveLoanOfferPresented(@Payload LoanOfferPresentedRabbit message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

    @StreamListener(target = MultiInputChannelsKafka.POSTING_ENTRY_REQUEST_INPUT)
    void receivePostingEntryRequest(@Payload PostingEntryRequest message, @Headers MessageHeaders headers) {
        messageBusConsumerService.receive(message, headers);
    }

with rabbitmq exchange configured: enter image description here

with that being said, problem is that with this configuration program receives the message with specific routing key on all 4 streamListener consumer endpoints. in turn. so first receiveLoanContractWaitingForActivation(..) is invoked with the message type not matching the declared payload as the first parameter goes, after that receiveLoanContractComponentsChanged(..) same story as with the first, then the correct one (according to the message's routing key: receiveLoanOfferPresented(..)), message gets processed correctly as the correct listener was invoked then, the last, incorrect one receivePostingEntryRequest(..) ending up the same as first and second.

so basically it seems like I'm failing to bind @StreamListener to the proper queue in the exchange which I thought will be specified by the routing key.

Can you, please, point out missing/incorrect piece of configuration in here?

thanks!

CodePudding user response:

You can't use the same destination/group for all 3 bindings; you need a separate queue for each.

The routing key is simply a binding between the queue and the exchange; with your current setup you have a single queue with three bindings.

Change the groups so they are unique.

  • Related