Home > database >  spring-integration MockIntegrationContext ReactiveStreamConsumer: IllegalArgumentException: 'su
spring-integration MockIntegrationContext ReactiveStreamConsumer: IllegalArgumentException: 'su

Time:01-10

My test is failing because of the missing 'subscriber' field IllegalArgumentException: 'subscriber' must not be null in the "...endpoint"

    @Test
    public void test() throws InterruptedException {
        ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
        CountDownLatch receiveLatch = new CountDownLatch(1);
        MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
        this.mockIntegrationContext
                .substituteMessageHandlerFor(
                        "test2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1",
                        mockMessageHandler);
        this.integrationFlowWithReactiveConsumerHandler.getInputChannel().send(new GenericMessage<>("test2"));
        assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
        verify(mockMessageHandler).handleMessage(any());
        assertThat(captor.getValue().getPayload())
                .isEqualTo("reactive-message-text");
    }

It's failing here in MockIntegrationContext.java, calling substituteMessageHandlerFor method, when endpoint is ReactiveStreamsConsumer

    public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR - complexity
            MessageHandler mockMessageHandler, boolean autoStartup) {

        Object endpoint = this.beanFactory.getBean(consumerEndpointId, IntegrationConsumer.class);
        if (autoStartup && endpoint instanceof Lifecycle) {
            ((Lifecycle) endpoint).stop();
        }
        DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(endpoint);
        Object targetMessageHandler = directFieldAccessor.getPropertyValue(HANDLER);
        Assert.notNull(targetMessageHandler, () -> "'handler' must not be null in the: "   endpoint);
        if (endpoint instanceof ReactiveStreamsConsumer) {
            Object targetSubscriber = directFieldAccessor.getPropertyValue("subscriber");
            Assert.notNull(targetSubscriber, () -> "'subscriber' must not be null in the: "   endpoint);

The handler endpoint bean is ...:reactive-outbound-channel-adapter which is being instantiated using ReactiveStreamConsumer constructor, where the 'subsciber' field is null by default.

    /**
     * Instantiate an endpoint based on the provided {@link MessageChannel} and {@link ReactiveMessageHandler}.
     * @param inputChannel the channel to consume in reactive manner.
     * @param reactiveMessageHandler the {@link ReactiveMessageHandler} to process messages.
     * @since 5.3
     */
    public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandler reactiveMessageHandler) {
        Assert.notNull(inputChannel, "'inputChannel' must not be null");
        this.inputChannel = inputChannel;
        this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
        this.reactiveMessageHandler = reactiveMessageHandler;
        this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
        this.subscriber = null;
        this.lifecycleDelegate =
                reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
    }

During the test it creates the endpoint bean, and then during the substituteMessageHandlerFor it throws an exception of missing subscriber field

Integration flow is simple with single DB reactive handler.

Any ideas? Thanks a lot.

CodePudding user response:

This is a bug in the testing framework. When we introduced a ReactiveMessageHandler support into that ReactiveStreamsConsumer, we just missed to adjust MockIntegrationContext logic respectively.

There is no way to mock a reactive endpoint at the moment as any reasonable workaround. However you can introduce some intermediate endpoint into the flow, e.g. bridge() and mock just this one without any return. So, your test will pass and nothing will be send to the real reactive endpoint in the end.

Feel free to raise a GH issue and we will look into that ASAP.

  • Related