I'm completely new to reactor programming and I'm really struggling with migrating old integration tests since upgrading to the latest Spring Boot / State Machine. Most Integration tests have the same basic steps :
- Call a method that returns a
Mono
and starts a state Machine and returns an object containing a generated uniqueid
as well as some other infos related to the initial request. - With the returned object call a method that verifies if a value has been updated in the database (using the information of the object retried in step 1)
- Poll at a fixed interval the method that checks in the database if value has changed until either the value has changed or a predefined timeout occurs.
- Check another table in the database if another object has been updated
Below an example:
@Test
void testEndToEnd() {
var instance = ServiceInstance.buildDefault();
var updateRequest = UpdateRequest.build(instance);
// retrieve an update Response related to the request
// since a unique id is generated when triggering the update request
// before starting a stateMachine that goes through different steps
var updateResponse = service.updateInstance(updateRequest).block();
await().alias("Check if operation was successful")
.atMost(Duration.ofSeconds(120))
.pollInterval(Duration.ofSeconds(2))
.until(() -> expectOperationState(updateResponse, OperationState.SUCCESS))
// check if values are updated in secondary table
assertValuesInTransaction(updateResponse);
}
This was working fine before but ever since the latest update where it fails with the exception :
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-6
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
at reactor.core.publisher.Mono.block(Mono.java:1710)
I saw that a good practice to test reactor methods using StepVerifier
but I do not see how I can reproduce the part done with Awaitability
to poll to see if the value has changed in the DB since the method that checks in the DB returns a Mono
and not a flux that keeps sending values.
Any idea on how to accomplish this or to make the spring stack accept blocking operations?
Thanks
My current stack :
- Spring Boot 3.0.1
- Spring State Machine 3.0.1
- Spring 6
- Junit 5.9.2
CodePudding user response:
So as discussed in comments here is an example with comments. I used flatMap
to subscribe to what expectOperationState
returns. Also there is Mono.fromCallable
used which check the value from some method and if it fails to emit anything in 3 seconds - the timeout exception is thrown. Also we could try to get rid of this boolean
value from expectOperationState
and refactor the code to just return Mono<Void>
with completed signal but this basically shows how you can achieve what you want.
class TestStateMachine {
@Test
void testUntilSomeOperationCompletes() {
final Service service = new Service();
final UpdateRequest updateRequest = new UpdateRequest();
StepVerifier.create(service.updateInstance(updateRequest)
.flatMap(updateResponse -> expectOperationState(updateResponse, OperationState.SUCCESS))
)
.consumeNextWith(Assertions::assertTrue)
.verifyComplete();
}
private Mono<Boolean> expectOperationState(final UpdateResponse updateResponse, final OperationState success) {
return Mono.fromCallable(() -> {
while (true) {
boolean isInDb = checkValueFromDb(updateResponse);
if (isInDb) {
return true;
}
}
})
.publishOn(Schedulers.single())
//timeout if we not receive any value from callable within 3 seconds so that we do not check forever
.timeout(Duration.ofSeconds(3));
}
private boolean checkValueFromDb(final UpdateResponse updateResponse) {
return true;
}
}
class Service {
Mono<UpdateResponse> updateInstance(final UpdateRequest updateRequest) {
return Mono.just(new UpdateResponse());
}
}
Here is an example without using Mono<Boolean>
:
class TestStateMachine {
@Test
void test() {
final Service service = new Service();
final UpdateRequest updateRequest = new UpdateRequest();
StepVerifier.create(service.updateInstance(updateRequest)
.flatMap(updateResponse -> expectOperationState(updateResponse, OperationState.SUCCESS).timeout(Duration.ofSeconds(3)))
)
.verifyComplete();
}
private Mono<Void> expectOperationState(final UpdateResponse updateResponse, final OperationState success) {
return Mono.fromCallable(() -> {
while (true) {
boolean isInDb = checkValueFromDb(updateResponse);
if (isInDb) {
//return completed Mono
return Mono.<Void>empty();
}
}
})
.publishOn(Schedulers.single())
//timeout if we not receive any value from callable within 3 seconds so that we do not check forever
.timeout(Duration.ofSeconds(3))
.flatMap(objectMono -> objectMono);
}
private boolean checkValueFromDb(final UpdateResponse updateResponse) {
return true;
}
}