I'm totally new to the Java Reactor API.
I use a WebClient to retrieve data from an external webservice, which I then map to a DTO of class "LearnDetailDTO".
But before sending back this DTO, I have to modify it with data I get from another webservice. For this, I chain the calls with flatMap(). I get my data from the second webservice, but my DTO is returned before it is modified with the new data.
My problem is: how to wait until all calls to the second webservice are finished and the DTO is modified before sending it back to the caller?
Here is my code:
class Controller {
@GetMapping(value = "/learn/detail/", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<LearnDetailDTO> getLearnDetail() {
return getLearnDetailDTO();
}
private Mono<LearnDetailDTO> getLearnDetailDTO() {
WebClient client = WebClient.create("https://my_rest_webservice.com");
return client
.get()
.retrieve()
.bodyToMono(LearnDetailDTO.class)
.flatMap(learnDetailDTO -> {
LearnDetailDTO newDto = new LearnDetailDTO(learnDetailDTO );
for (GroupDTO group : newDto.getGroups()) {
String keyCode = group.getKeyCode();
for (GroupDetailDto detail : group.getGroupsDetailList()) {
adeService.getResourcesList(keyCode) // one asynchonous rest call to get resources
.flatMap(resource -> {
Long id = resource.getData().get(0).getId();
return adeService.getEventList(id); // another asynchronous rest call to get an events list with the resource coming from the previous call
})
.subscribe(event -> {
detail.setCreneaux(event.getData());
});
}
}
return Mono.just(newDto);
});
}
I tried to block() my call to adeservice.getEventList() instead of subscribe(), but I get the following error:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
How to be sure that my newDTO object is complete before returning it ?
CodePudding user response:
You should not mutate objects in subscribe
. The function passed to subscribe will be called asynchronously in an unknown time in the future.
Subscribe should be considered a terminal operation, and should only serve to connect to other part of your system. It should not modify values inside the scope of your datastream.
What you want, is a pipeline that collects all events, and then map them to a dto with collected events.
As a rule of thumb your pipeline result must be composed of accumulated results in the operation chain. You should never have a "subscribe" in the middle of the operation chain, and you should never mutate an object with it.
I will provide a simplified example so you can take time to analyze the logic that can reach the goal: accumulate new values asynchronously in a single result. In this example, I've removed any notion of "detail" to connect directly groups to events, to simplify the overall code.
The snippet:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class AccumulateProperly {
// Data object definitions
record Event(String data) {}
record Resource(int id) {}
record Group(String keyCode, List<Event> events) {
// When adding events, do not mute object directly. Instead, create a derived version
Group merge(List<Event> newEvents) {
var allEvents = new ArrayList<>(events);
allEvents.addAll(newEvents);
return new Group(keyCode, allEvents);
}
}
record MyDto(List<Group> groups) { }
static Flux<Resource> findResourcesByKeyCode(String keyCode) {
return Flux.just(new Resource(1), new Resource(2));
}
static Flux<Event> findEventById(int id) {
return Flux.just(
new Event("resource_" id "_event_1"),
new Event("resource_" id "_event_2")
);
}
public static void main(String[] args) {
MyDto dtoInstance = new MyDto(List.of(new Group("myGroup", List.of())));
System.out.println("INITIAL STATE:");
System.out.println(dtoInstance);
// Asynchronous operation pipeline
Mono<MyDto> dtoCompletionPipeline = Mono.just(dtoInstance)
.flatMap(dto -> Flux.fromIterable(dto.groups)
// for each group, find associated resources
.flatMap(group -> findResourcesByKeyCode(group.keyCode())
// For each resource, fetch its associated event
.flatMap(resource -> findEventById(resource.id()))
// Collect all events for the group
.collectList()
// accumulate collected events in a new instance of the group
.map(group::merge)
)
// Collect all groups after they've collected events
.collectList()
// Build a new dto instance from the completed set of groups
.map(completedGroups -> new MyDto(completedGroups))
);
// NOTE: block is here only because we are in a main function and that I want to print
// pipeline output before program extinction.
// Try to avoid block. Return your mono, or connect it to another Mono or Flux object using
// an operation like flatMap.
dtoInstance = dtoCompletionPipeline.block(Duration.ofSeconds(1));
System.out.println("OUTPUT STATE:");
System.out.println(dtoInstance);
}
}
Its output:
INITIAL STATE:
MyDto[groups=[Group[keyCode=myGroup, events=[]]]]
OUTPUT STATE:
MyDto[groups=[Group[keyCode=myGroup, events=[Event[data=resource_1_event_1], Event[data=resource_1_event_2], Event[data=resource_2_event_1], Event[data=resource_2_event_2]]]]]