I'm having trouble with understanding how to achieve my goal with reactive approach. Let's assume that I have a Controller, that will return Flux:
@PostMapping(value = "/mutation/stream/{domainId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Mutation> getMutationReactive(@RequestBody List<MutationRequest> mutationRequests, @PathVariable Integer domainId) {
return mutationService.getMutations(mutationRequests, domainId);
}
In service, currently with .subscribeOn(Schedulers.boundedElastic()), because it calls for a blocking code that is wrapped into a Callable.
public Flux<Mutation> getMutations(List<MutationRequest> mutationRequests, int domainId) {
return Flux.fromIterable(mutationRequests)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(mutationRequest -> getMutation(mutationRequest.getGameId(), mutationRequest.getTypeId(), domainId));
}
getMutation() with blocking calls, currently wrapped into a Callable:
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return Mono.fromCallable(() -> {
Mutation mutation = mutationProvider.findByGameIdAndTypeId(gameId, typeId).block(); // mutationProvider.findByGameIdAndTypeId() returns Mono<Mutation>
if (mutation == null) {
throw new RuntimeException("Mutation was not found by gameId and typeId");
}
State state = stateService.getStateByIds(mutation.getId()), domainId).blockFirst(); //stateService.getStateByIds() returns Mono<State>
if (state == null || state.getValue() == null) {
log.info("Requested mutation with gameId[%s] typeId[%s] domainId[%s] is disabled. Value is null.".formatted(gameId, typeId, domainId));
return null;
}
mutation.setTemplateId(state.getTemplateId());
return (mutation);
});
}
How do I approach the getMutation() function to use reactive streams, instead of using .block() methods inside a Callable? Basically, I first need to retrieve Mutation from DB -> then using ID of mutation, get its state from other service -> then if state and its value are not null, set templateId of state to mutation and return, or return null.
I've tried something like this:
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
if (state != null && state.getValue() != null) {
mutation.setTemplateId(state.getTemplateId());
}
//TODO if state/value is null -> need to propagate further to return null instead of mutation...
return Mono.justOrEmpty(state);
});
return Mono.just(mutation);
});
}
But it's obviously incorrect, nothing is subscribed to stateService.getStatesByIds(mutation.getId()), domainId) AND I would like to return a null if the retrieved state of mutation or its value are null.
CodePudding user response:
You are ignoring the value of the inner flatMap
hence the warning.
Without trying you need something like this
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
return stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
if (state != null && state.getValue() != null) {
mutation.setTemplateId(state.getTemplateId());
return Mono.just(mutation);
}
return Mono.empty();
});
});
}
Although not sure if you could rewrite the outer flatMap
not to a regular map
instead and you might want to use filter
and defaultIfEmpty
with that as well
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
return stateService.getStatesByIds(mutation.getId(), domainId)
.filter(state -> state != null && state.getValue() != null)
.flatMap(state -> {
mutation.setTemplateId(state.getTemplateId());
return Mono.just(mutation);})
.defaultIfEmpty(Mono.empty());
}
This is just from the top of my head and I have no idea what some of the return types are here (Flux
or Mono
) for your own APIs.