I have a project that is spring-cloud-starter-parent:Hoxton.SR9
.
the main objective here is messaging, thus there are message (kafka/rabbit) producers and consumers declared:
spring:
cloud:
stream:
default-binder: kafka
contentType: application/* avro
bindings:
loanContractWaitingForActivationInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanContractComponentsChangedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
loanOfferPresentedInput:
binder: rabbit
destination: icebank
group: LH_LoanEventKeeper_testing2
contentType: application/json
postingEntryRequestInput:
destination: PAY_PaymentServices_PostingEntryRequest
group: LH_LoanEventKeeper
contentType: application/* avro
consumer:
maxAttempts: 1
autoCommitOnError: true
headerMode: embeddedHeaders
kafka:
bindings:
postingEntryRequestInput:
consumer:
autoCommitOnError: true
binder:
configuration:
security:
protocol: ${KAFKA_SECURITY_PROTOCOL:SSL}
ssl:
truststore:
location: path
password: ****
type: JKS
rabbit:
binder:
useSSL: ${AMQP_SSL_ENABLED:true}
bindings:
loanContractWaitingForActivationInput:
consumer:
bindingRoutingKey: loans.contracts.waitingForActivation
exchange-auto-delete: false
loanContractComponentsChangedInput:
consumer:
bindingRoutingKey: loans.contracts.components
exchange-auto-delete: false
loanOfferPresentedInput:
consumer:
bindingRoutingKey: loans.offers.presented
exchange-auto-delete: false
alongside with typed @StreamListener
s provided:
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_WAITING_FOR_ACTIVATION_INPUT)
void receiveLoanContractWaitingForActivation(@Payload LoanContractWaitingForActivationRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_CONTRACT_COMPONENTS_CHANGED_INPUT)
void receiveLoanContractComponentsChanged(@Payload LoanContractComponentsChangedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsRabbit.LOAN_OFFER_PRESENTED_INPUT)
void receiveLoanOfferPresented(@Payload LoanOfferPresentedRabbit message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
@StreamListener(target = MultiInputChannelsKafka.POSTING_ENTRY_REQUEST_INPUT)
void receivePostingEntryRequest(@Payload PostingEntryRequest message, @Headers MessageHeaders headers) {
messageBusConsumerService.receive(message, headers);
}
with rabbitmq exchange configured:
with that being said, problem is that with this configuration program receives the message with specific routing key on all 4 streamListener consumer endpoints. in turn. so first
receiveLoanContractWaitingForActivation(..)
is invoked with the message type not matching the declared payload as the first parameter goes, after that receiveLoanContractComponentsChanged(..)
same story as with the first, then the correct one (according to the message's routing key: receiveLoanOfferPresented(..)
), message gets processed correctly as the correct listener was invoked then, the last, incorrect one receivePostingEntryRequest(..)
ending up the same as first and second.
so basically it seems like I'm failing to bind @StreamListener
to the proper queue in the exchange which I thought will be specified by the routing key.
Can you, please, point out missing/incorrect piece of configuration in here?
thanks!
CodePudding user response:
You can't use the same destination/group for all 3 bindings; you need a separate queue for each.
The routing key is simply a binding between the queue and the exchange; with your current setup you have a single queue with three bindings.
Change the groups so they are unique.