Home > Software design >  Writing blocking operations in reactor tests with Spring and State Machine
Writing blocking operations in reactor tests with Spring and State Machine

Time:01-30

I'm completely new to reactor programming and I'm really struggling with migrating old integration tests since upgrading to the latest Spring Boot / State Machine. Most Integration tests have the same basic steps :

  1. Call a method that returns a Mono and starts a state Machine and returns an object containing a generated unique id as well as some other infos related to the initial request.
  2. With the returned object call a method that verifies if a value has been updated in the database (using the information of the object retried in step 1)
  3. Poll at a fixed interval the method that checks in the database if value has changed until either the value has changed or a predefined timeout occurs.
  4. Check another table in the database if another object has been updated

Below an example:

    @Test
    void testEndToEnd() {
      var instance = ServiceInstance.buildDefault(); 
      var updateRequest = UpdateRequest.build(instance);

      // retrieve an update Response related to the request 
      // since a unique id is generated when triggering the update request
      // before starting a stateMachine that goes through different steps        
      var updateResponse = service.updateInstance(updateRequest).block(); 

      await().alias("Check if operation was successful")
             .atMost(Duration.ofSeconds(120))
             .pollInterval(Duration.ofSeconds(2))
             .until(() -> expectOperationState(updateResponse, OperationState.SUCCESS))
                        
        // check if values are updated in secondary table
        assertValuesInTransaction(updateResponse);
}

This was working fine before but ever since the latest update where it fails with the exception :

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-6
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
    at reactor.core.publisher.Mono.block(Mono.java:1710)

I saw that a good practice to test reactor methods using StepVerifier but I do not see how I can reproduce the part done with Awaitability to poll to see if the value has changed in the DB since the method that checks in the DB returns a Mono and not a flux that keeps sending values.

Any idea on how to accomplish this or to make the spring stack accept blocking operations?

Thanks

My current stack :

  • Spring Boot 3.0.1
  • Spring State Machine 3.0.1
  • Spring 6
  • Junit 5.9.2

CodePudding user response:

So as discussed in comments here is an example with comments. I used flatMap to subscribe to what expectOperationState returns. Also there is Mono.fromCallable used which check the value from some method and if it fails to emit anything in 3 seconds - the timeout exception is thrown. Also we could try to get rid of this boolean value from expectOperationState and refactor the code to just return Mono<Void> with completed signal but this basically shows how you can achieve what you want.

class TestStateMachine {
    @Test
    void testUntilSomeOperationCompletes() {

        final Service service = new Service();
        final UpdateRequest updateRequest = new UpdateRequest();
        StepVerifier.create(service.updateInstance(updateRequest)
                        .flatMap(updateResponse -> expectOperationState(updateResponse, OperationState.SUCCESS))
                )
                .consumeNextWith(Assertions::assertTrue)
                .verifyComplete();

    }

    private Mono<Boolean> expectOperationState(final UpdateResponse updateResponse, final OperationState success) {
        return Mono.fromCallable(() -> {
                    while (true) {
                        boolean isInDb = checkValueFromDb(updateResponse);
                        if (isInDb) {
                            return true;
                        }
                    }
                })
                .publishOn(Schedulers.single())
                //timeout if we not receive any value from callable within 3 seconds so that we do not check forever
                .timeout(Duration.ofSeconds(3));

    }

    private boolean checkValueFromDb(final UpdateResponse updateResponse) {
        return true;
    }
    
}

class Service {
    Mono<UpdateResponse> updateInstance(final UpdateRequest updateRequest) {
        return Mono.just(new UpdateResponse());
    }
}

Here is an example without using Mono<Boolean> :


class TestStateMachine {
    @Test
    void test() {

        final Service service = new Service();
        final UpdateRequest updateRequest = new UpdateRequest();
        StepVerifier.create(service.updateInstance(updateRequest)
                        .flatMap(updateResponse -> expectOperationState(updateResponse, OperationState.SUCCESS).timeout(Duration.ofSeconds(3)))
                )
                .verifyComplete();
    }

    private Mono<Void> expectOperationState(final UpdateResponse updateResponse, final OperationState success) {
        return Mono.fromCallable(() -> {
                    while (true) {
                        boolean isInDb = checkValueFromDb(updateResponse);
                        if (isInDb) {
                            //return completed Mono
                            return Mono.<Void>empty();
                        }
                    }
                })
                .publishOn(Schedulers.single())
                //timeout if we not receive any value from callable within 3 seconds so that we do not check forever
                .timeout(Duration.ofSeconds(3))
                .flatMap(objectMono -> objectMono);


    }

    private boolean checkValueFromDb(final UpdateResponse updateResponse) {
        return true;
    }

}
  • Related