Home > front end >  Spring Cloud Stream - Testing consuming from and producing into a shared topic
Spring Cloud Stream - Testing consuming from and producing into a shared topic

Time:02-09

I have been trying to implement the following solution:

spring-cloud-stream-shared-topic

My application is expecting to consume message A from all-messages, do some business logic and then producing message B back into all-messages.

The reason why I am using StreamBridge instead of a Function<A,B> is because I want the producing side to work with an arbitrary number of produced messages, but for the sake of this example I am trying to simplify the scenario to just one.

Additionally, there is a custom router function in order to avoid an infinite loop, that dispatch the incoming messages into the appropriate consumer, either incoming or discarded, would be great to have a way to effectively discard messages.

That being said I cannot quite get the implementation right using Spring Cloud Stream.

I'd like your help to understand what am I doing wrong and how to fix the current configure/setup in order to make the solution work as expected, specifically:

  • Is this solution supported by Spring cloud stream?
  • Is my application configuration correctly implementing the solution diagram above?
  • Which bindings exactly should I use inside the app when sending/receiving messages?

The major headache comes from the bindings, so I tried to write a test with different combinations of incoming and outgoing bindings to see what is what, something along these lines:

class ScsProblemTests {

    /* ... */

    @ParameterizedTest
    @MethodSource("bindings")
    void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
        givenNoOutgoingMessages();
        whenAnIncomingMessageArrives(incomingBinding);
        thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
    }

    public static Stream<Arguments> bindings() {
        return Stream.of(
            Arguments.of(null, null),
            Arguments.of(null, "outgoing-out-0"),
            Arguments.of(null, "all-messages"),
            Arguments.of("incoming-in-0", null),
            Arguments.of("incoming-in-0", "outgoing-out-0"),
            Arguments.of("incoming-in-0", "all-messages"),
            Arguments.of("all-messages", null),
            Arguments.of("all-messages", "outgoing-out-0"),
            Arguments.of("all-messages", "all-messages")
        );
    }

    /* ... */
}

I am running this set of tests with a overrides spring profile, where I setup the destination overrides as per the diagram, I am also testing the same set with a different spring profile with no overrides, again just for having a control group to compare with. Only 2 tests from the no-overrides profile pass, the rest fails.

The no-overrides profile does not match the design obviously, but I was curious to see how the overrides where affecting the results, specifically the no-overrides tests that passes are the one where:

  • incomingBinding=null, outgoingBinding=null
  • incomingBinding=null, outgoingBinding=outgoing-out-0

And for my understanding of Spring Cloud Stream, even in this no-overrides case, I'd expect also the following to pass (the are not):

  • incomingBinding=incoming-in-0, outgoingBinding=null
  • incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0

At this point I am starting to think I misunderstood some concepts behind Spring Cloud Stream, but I really hope you can provide some useful advise.

I have shared my code into this repository for convenience.

Thanks in advance.

CodePudding user response:

It seems to me you are reusing the same queue for different type of messages which forces you to introduce the routing. Further more, the two functions appear to exist in the same runtime space, so why not just pass-by-reference (invoke one function from another), why do you need to introduce networking.

I don't know your use case, hence can only comment on what i see, but it appears there is a design issue here. Perhaps if you actually share the use case we can help with the appropriate solution.

CodePudding user response:

Ok here are the problems in my implementation and how to fix them:

  1. I had a typo in the routing configuration, so wasn't enabled, this is how to enable the function router in spring cloud:
spring:
  cloud:
    stream:
      function:
        routing:
          enabled: true
  1. Because I have a function router involved, I don't need anymore to configure my incoming-in-0 binding instead I need to configure a destination for the function router:
spring:
  cloud:
    stream:
      bindings:
        functionRouter-in-0:
          destination: all-messages
        outgoing-out-0:
          destination: all-messages
      source: outgoing
      # ...
  1. I misunderstood bindings/destinations and how to use the testing helpers provided by the framework, specifically InputDestination and OutputDestination. I was not sure what parameters should I use to send a message or receive one. The answer is that those components are there to simulate the real binder (e.g. RabbitMQ, Kafka, etc), and they have no knowledge of a binding (which is a construct from spring cloud), they only know about destination. So in my case that translated into something like this:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {

    @Autowired
    private InputDestination input;

    @Autowired
    private OutputDestination output;

    /* ... */

    @Test
    void consumeFromAndProduceIntoSharedTopic() {
        // prepare message A ... 

        // simulate message "A" arriving into "all-messages"
        input.send(messageA, "all-messages")
        
        // ...
        // application will pick up the message
        // the function router will dispatch the message to the right consumer
        // the consumer does some business logic
        // eventually a message "B" should be produced into "all-messages"
        
        // check if "all-messages" contains message "B"
        // NOTE: "all-messages" will contain both "A" and "B"
        var discard = output.receive(1000, "all-messages"); // message A
        var messageB = output.receive(1000, "all-messages");
        // assertions ...
    }

    /* ... */
}

Note: as per the comments in the pseudo-code, the final state is represented by having both A and B in all-messages, in this case the OutputDestination is simply a window to the shared channel, which obviously will also contain the initial message we sent.

Hopefully this makes sense. I cleaned and pushed a working version of the code into a fixed branch in the same repository so you can see the actual fixes.

  •  Tags:  
  • Related