Home > other >  Reactor TestPublisher not working with non-completing flux
Reactor TestPublisher not working with non-completing flux

Time:02-11

I think this is a very basic question about unit testing with project reactor (java), but I've been looking at the docs and can't figure out what my misunderstanding is. I've been trying to use TestPublisher and StepVerifier to test a flux that is supposed to be never-ending, so my scenario doesn't include a terminal signal.

Whereas using a regular sink created with Sinks.many().unicast() works as I expect, using TestPublisher doesn't:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

import java.time.Duration;

public class ExploreStepVerifier {

    // this test fails: Did not observe any item or terminal signal within 1000ms in 'source(FluxSource)'
    @Test
    void test_with_test_publisher() {
        TestPublisher<String> publisher = TestPublisher.create();
        publisher.next("hello");
        var result = publisher.flux();
        StepVerifier.create(result)
            .expectNext("hello").as("initial message received")
            .verifyTimeout(Duration.ofSeconds(1));
    }

    // this test succeeds
    @Test
    void test_with_sink() {
        Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
        sink.tryEmitNext("hello");
        var result = sink.asFlux();
        StepVerifier.create(result)
            .expectNext("hello").as("initial message received")
            .verifyTimeout(Duration.ofSeconds(1));
    }
}

I expected that I'd be able to substitute TestPublisher for sink as shown in test_with_test_publisher, but it doens't work. Can anyone explain? Thanks--

CodePudding user response:

That sink variant "warms up" and retains a (limited) number of signals if they occur before any subscribers comes in, whereas the TestPublisher doesnt (if nobody's listening, the value is dropped) unless you use the cold variant Test Publisher.createCold()

  • Related