Home > OS >  Java Reactor API: how to wait for an object to be modified by asynchronous calls to be completely mo
Java Reactor API: how to wait for an object to be modified by asynchronous calls to be completely mo

Time:08-31

I'm totally new to the Java Reactor API.

I use a WebClient to retrieve data from an external webservice, which I then map to a DTO of class "LearnDetailDTO".

But before sending back this DTO, I have to modify it with data I get from another webservice. For this, I chain the calls with flatMap(). I get my data from the second webservice, but my DTO is returned before it is modified with the new data.

My problem is: how to wait until all calls to the second webservice are finished and the DTO is modified before sending it back to the caller?

Here is my code:

class Controller {

   @GetMapping(value = "/learn/detail/", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<LearnDetailDTO> getLearnDetail() {

        return getLearnDetailDTO();
    }
    
    private Mono<LearnDetailDTO> getLearnDetailDTO() {
       
        
        WebClient client = WebClient.create("https://my_rest_webservice.com");

        return client
            .get()
            .retrieve()
            .bodyToMono(LearnDetailDTO.class)
            .flatMap(learnDetailDTO -> {
                LearnDetailDTO newDto = new LearnDetailDTO(learnDetailDTO );

                for (GroupDTO group : newDto.getGroups()) {
                    String keyCode = group.getKeyCode();

                    for (GroupDetailDto detail : group.getGroupsDetailList()) {
                        
                              adeService.getResourcesList(keyCode) // one asynchonous rest call to get resources
                                 .flatMap(resource -> {
                                     Long id = resource.getData().get(0).getId();
                                     return adeService.getEventList(id); // another asynchronous rest call to get an events list with the resource coming from the previous call
                                  })
                                  .subscribe(event -> {
                                     detail.setCreneaux(event.getData());
                                  });
                    }
                }

                return Mono.just(newDto);
            });

}

I tried to block() my call to adeservice.getEventList() instead of subscribe(), but I get the following error:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

How to be sure that my newDTO object is complete before returning it ?

CodePudding user response:

You should not mutate objects in subscribe. The function passed to subscribe will be called asynchronously in an unknown time in the future.

Subscribe should be considered a terminal operation, and should only serve to connect to other part of your system. It should not modify values inside the scope of your datastream.

What you want, is a pipeline that collects all events, and then map them to a dto with collected events.

As a rule of thumb your pipeline result must be composed of accumulated results in the operation chain. You should never have a "subscribe" in the middle of the operation chain, and you should never mutate an object with it.

I will provide a simplified example so you can take time to analyze the logic that can reach the goal: accumulate new values asynchronously in a single result. In this example, I've removed any notion of "detail" to connect directly groups to events, to simplify the overall code.

The snippet:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class AccumulateProperly {

    // Data object definitions

    record Event(String data) {}
    record Resource(int id) {}

    record Group(String keyCode, List<Event> events) {
        // When adding events, do not mute object directly. Instead, create a derived version
        Group merge(List<Event> newEvents) {
            var allEvents = new ArrayList<>(events);
            allEvents.addAll(newEvents);
            return new Group(keyCode, allEvents);
        }
    }

    record MyDto(List<Group> groups) { }

    static Flux<Resource> findResourcesByKeyCode(String keyCode) {
        return Flux.just(new Resource(1), new Resource(2));
    }

    static Flux<Event> findEventById(int id) {
        return Flux.just(
                new Event("resource_" id "_event_1"),
                new Event("resource_" id "_event_2")
        );
    }

    public static void main(String[] args) {
        MyDto dtoInstance = new MyDto(List.of(new Group("myGroup", List.of())));
        System.out.println("INITIAL STATE:");
        System.out.println(dtoInstance);

        // Asynchronous operation pipeline
        Mono<MyDto> dtoCompletionPipeline = Mono.just(dtoInstance)
                .flatMap(dto -> Flux.fromIterable(dto.groups)
                            // for each group, find associated resources
                            .flatMap(group -> findResourcesByKeyCode(group.keyCode())
                                    // For each resource, fetch its associated event
                                    .flatMap(resource -> findEventById(resource.id()))
                                    // Collect all events for the group
                                    .collectList()
                                    // accumulate collected events in a new instance of the group
                                    .map(group::merge)
                            )
                            // Collect all groups after they've collected events
                            .collectList()
                            // Build a new dto instance from the completed set of groups
                            .map(completedGroups -> new MyDto(completedGroups))
                );


        // NOTE: block is here only because we are in a main function and that I want to print
        // pipeline output before program extinction.
        // Try to avoid block. Return your mono, or connect it to another Mono or Flux object using
        // an operation like flatMap.
        dtoInstance = dtoCompletionPipeline.block(Duration.ofSeconds(1));
        System.out.println("OUTPUT STATE:");
        System.out.println(dtoInstance);
    }
}

Its output:

INITIAL STATE:
MyDto[groups=[Group[keyCode=myGroup, events=[]]]]
OUTPUT STATE:
MyDto[groups=[Group[keyCode=myGroup, events=[Event[data=resource_1_event_1], Event[data=resource_1_event_2], Event[data=resource_2_event_1], Event[data=resource_2_event_2]]]]]
  • Related