I have been trying to implement the following solution:
My application is expecting to consume message A
from all-messages
, do some business logic and then producing
message B
back into all-messages
.
The reason why I am using StreamBridge
instead of a Function<A,B>
is because I want the producing side to work with an arbitrary
number of produced messages, but for the sake of this example I am trying to simplify the scenario to just one.
Additionally, there is a custom router function in order to avoid an infinite loop, that dispatch the incoming messages
into the appropriate consumer, either incoming
or discarded
, would be great to have a way to effectively discard messages.
That being said I cannot quite get the implementation right using Spring Cloud Stream.
I'd like your help to understand what am I doing wrong and how to fix the current configure/setup in order to make the solution work as expected, specifically:
- Is this solution supported by Spring cloud stream?
- Is my application configuration correctly implementing the solution diagram above?
- Which bindings exactly should I use inside the app when sending/receiving messages?
The major headache comes from the bindings, so I tried to write a test with different combinations of incoming and outgoing bindings to see what is what, something along these lines:
class ScsProblemTests {
/* ... */
@ParameterizedTest
@MethodSource("bindings")
void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
givenNoOutgoingMessages();
whenAnIncomingMessageArrives(incomingBinding);
thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
}
public static Stream<Arguments> bindings() {
return Stream.of(
Arguments.of(null, null),
Arguments.of(null, "outgoing-out-0"),
Arguments.of(null, "all-messages"),
Arguments.of("incoming-in-0", null),
Arguments.of("incoming-in-0", "outgoing-out-0"),
Arguments.of("incoming-in-0", "all-messages"),
Arguments.of("all-messages", null),
Arguments.of("all-messages", "outgoing-out-0"),
Arguments.of("all-messages", "all-messages")
);
}
/* ... */
}
I am running this set of tests with a overrides
spring profile, where I setup the destination overrides as per the diagram, I am also testing the same set with a different spring profile with no overrides, again just for having a control group to compare with. Only 2 tests from the no-overrides
profile pass, the rest fails.
The no-overrides
profile does not match the design obviously, but I was curious to see how the overrides where affecting the results, specifically the no-overrides
tests that passes are the one where:
- incomingBinding=null, outgoingBinding=null
- incomingBinding=null, outgoingBinding=outgoing-out-0
And for my understanding of Spring Cloud Stream, even in this no-overrides
case, I'd expect also the following to pass (the are not):
- incomingBinding=incoming-in-0, outgoingBinding=null
- incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0
At this point I am starting to think I misunderstood some concepts behind Spring Cloud Stream, but I really hope you can provide some useful advise.
I have shared my code into this repository for convenience.
Thanks in advance.
CodePudding user response:
It seems to me you are reusing the same queue for different type of messages which forces you to introduce the routing. Further more, the two functions appear to exist in the same runtime space, so why not just pass-by-reference
(invoke one function from another), why do you need to introduce networking.
I don't know your use case, hence can only comment on what i see, but it appears there is a design issue here. Perhaps if you actually share the use case we can help with the appropriate solution.
CodePudding user response:
Ok here are the problems in my implementation and how to fix them:
- I had a typo in the routing configuration, so wasn't enabled, this is how to enable the function router in spring cloud:
spring:
cloud:
stream:
function:
routing:
enabled: true
- Because I have a function router involved, I don't need anymore to configure my
incoming-in-0
binding instead I need to configure a destination for the function router:
spring:
cloud:
stream:
bindings:
functionRouter-in-0:
destination: all-messages
outgoing-out-0:
destination: all-messages
source: outgoing
# ...
- I misunderstood bindings/destinations and how to use the testing helpers provided by the framework, specifically
InputDestination
andOutputDestination
. I was not sure what parameters should I use to send a message or receive one. The answer is that those components are there to simulate the real binder (e.g. RabbitMQ, Kafka, etc), and they have no knowledge of abinding
(which is a construct from spring cloud), they only know aboutdestination
. So in my case that translated into something like this:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
/* ... */
@Test
void consumeFromAndProduceIntoSharedTopic() {
// prepare message A ...
// simulate message "A" arriving into "all-messages"
input.send(messageA, "all-messages")
// ...
// application will pick up the message
// the function router will dispatch the message to the right consumer
// the consumer does some business logic
// eventually a message "B" should be produced into "all-messages"
// check if "all-messages" contains message "B"
// NOTE: "all-messages" will contain both "A" and "B"
var discard = output.receive(1000, "all-messages"); // message A
var messageB = output.receive(1000, "all-messages");
// assertions ...
}
/* ... */
}
Note: as per the comments in the pseudo-code, the final state is represented by having both A
and B
in all-messages
, in this case the OutputDestination
is simply a window to the shared channel, which obviously will also contain the initial message we sent.
Hopefully this makes sense. I cleaned and pushed a working version of the code into a fixed
branch in the same repository so you can see the actual fixes.