TLDR; How do you test a Reactive Function composition using the Test Binder?
I have a Spring Cloud Stream that uses Reactive Functions and I don't know how to test it. I don't see any official docs on how to do an Integration Test from input source to output destination binder.
In my specific case, I am connecting a Spring Integration flow using a Reactive Supplier and the IntegrationReactiveUtils.messageChannelToFlux()
pattern. This works in a development environment - I can pull messages from RabbitMQ using the Spring Integration Flow and they enter the SCSt.
My SCSt has several function chained together, each one is reactive. They are composed like func1|func2|func3
. I verified this works with a dev Rabbit (source) and Kafka (Destination).
I can't seem to figure out how to test this, and there doesn't seem to be any official documentation on testing a complete reactive stream. Right now I have code that roughly looks like this:
@Autowired
MessageChannel inputChannel;
@Autowired
private OutputDestination output;
@Test
void myTest() {
//omitted prep of var 'messageToSend'
this.inputChannel.send(messageToSend);
var outputMessage = output.receive(5000);
Assertions.assertNotNull(outputMessage.getPayload());
}
The error I receive is that output.receive(5000)
returns null
. I suspect a threading issue because I am not subscribing to the Flux and waiting for completion.
I have run a debugger in the Flux functions and see the message going all the way to the end with no errors or weirdness.
CodePudding user response:
I figured this out actually. I had to specify the binder name. I had a test property spring.cloud.stream.bindings.processingStream
set, which I thought made 2 new bindings (processingStream-in-0
and processingStream-out-0
).
It turns out I had to set the binding name in the test code like output.receive(5000, "processingStream")
, without the -out-0
suffix. I can now receive messages from the stream.