Home > Back-end >  Spring Reactive. How wait for all monos to finish?
Spring Reactive. How wait for all monos to finish?

Time:10-31

I have the following code where I call external APIs via webclient and return Mono. I need to execute some logic when I receive data. And after all, requests are processed, execute one logic for all gathered data. I can collect all Monos and put them to flux and then execute some logic at the end. But I have serviceName filed which is accessible only in the loop, so I need to execute logic for mono in loop and here I'm stuck and don't know how to wait for all data to complete and do it in a reactive way.

@Scheduled(fixedDelay = 50000)
public void refreshSwaggerConfigurations() {
  log.debug("Starting Service Definition Context refresh");
  List<SwaggerServiceData> allServicesApi = new ArrayList<>();

  swaggerProperties.getUrls().forEach((serviceName, serviceSwaggerUrl) -> {
    log.debug("Attempting service definition refresh for Service : {} ", serviceName);

    Mono<SwaggerServiceData> swaggerData = getSwaggerDefinitionForAPI(serviceName,
        serviceSwaggerUrl);

    swaggerData.subscribe(swaggerServiceData -> {
      if (swaggerServiceData != null) {
        allServicesApi.add(swaggerServiceData);
        String content = getJSON(swaggerServiceData);
        definitionContext.addServiceDefinition(serviceName, content);
      } else {
        log.error("Skipping service id : {} Error : Could not get Swagger definition from API ",
            serviceName);
      }
    });
  });
  //I need to wait here for all monos to complete and after that proceed for All gathered data...
  //Now it's empty And I know why, just don't know how to make it.
  Optional<SwaggerServiceData> swaggerAllServicesData = getAllServicesApiSwagger(allServicesApi);
  if (swaggerAllServicesData.isPresent()) {
    String allApiContent = getJSON(swaggerAllServicesData.get());
    definitionContext.addServiceDefinition("All", allApiContent);
  }
}


private Mono<SwaggerServiceData> getSwaggerDefinitionForAPI(String serviceName, String url) {
  log.debug("Accessing the SwaggerDefinition JSON for Service : {} : URL : {} ", serviceName,
      url);
    Mono<SwaggerServiceData> swaggerServiceDataMono = webClient.get()
        .uri(url)
        .exchangeToMono(clientResponse -> clientResponse.bodyToMono(SwaggerServiceData.class));
    return swaggerServiceDataMono;
}

CodePudding user response:

I have a simple Java program that sends multiple requests with Spring WebClient. Each returns a mono, and I am using response.subscribe() to check the result.

However, my main thread of execution finishes before all requests are processed, unless I add a long Thread.sleep().

With CompletableFutures you can use: CompletableFuture.allOf(futures).join();

CodePudding user response:

I would add a temporary class to group data and serivce name :

record SwaggerService(SwaggerServiceData swaggerServiceData, String serviceName) {
        boolean hasData() {
            return swaggerServiceData != null;
        }
 }

And then change your pipeline :

Flux.fromStream(swaggerProperties.getUrls().entrySet().stream())
                .flatMap((e) -> {
                    Mono<SwaggerServiceData> swaggerDefinitionForAPI = getSwaggerDefinitionForAPI(e.getKey(),
                            e.getValue());
                    return swaggerDefinitionForAPI.map(swaggerServiceData -> new SwaggerService(swaggerServiceData, e.getKey()));
                })
                .filter(SwaggerService::hasData)
                .map(swaggerService -> {
                    String content = getJSON(swaggerService.swaggerServiceData());
                    definitionContext.addServiceDefinition(swaggerService.serviceName(), content);
                    return swaggerService.swaggerServiceData();
                })
                // here we will collect all datas and they will be emmited as single Mono with list of SwaggerServiceData
                .collectList()
                .map(this::getAllServicesApiSwagger)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .subscribe(e -> {
                    String allApiContent = getJSON(e);
                    definitionContext.addServiceDefinition("All", allApiContent);
                });

This does not deal with logging error when SwaggerServiceData is null but you can further change it if you want. Also I assume that DefinitionContext is thread safe.

  • Related