Home > Software design >  Vertx CompositeFuture: on completion of all Futures
Vertx CompositeFuture: on completion of all Futures

Time:04-20

In a Vert.x web server, I have a set of Futures, which each one could either fail or succeed and hold a result. I am interested in the outcome (and possibly the result) of each and every one of those Futures, meaning I need to process the result of each Future.

I was thinking that Vert.x's CompositeFuture was the way to go, this is my code snippet:

List<Future> futures = dataProviders.stream()
    .filter(dp -> dp.isActive(requester))
    .map(DataProvider::getData)
    .collect(Collectors.toList());

CompositeFuture.all(futures)
        .onComplete(ar -> {
            if(ar.failed()) {
                routingContext.response()
                    .end(ar.cause());
                return;
            }
            
            CompositeFuture cf = ar.result();
            JsonArray data = new JsonArray();
            for(int i = 0; i < cf.size(); i  ) {
                if(cf.failed(i)) {
                    final JsonObject errorJson = new JsonObject();
                    errorJson.put("error", cf.cause(i).getMessage());
                    data.add(errorJson);
                } else {
                    data.add(((Data) cf.resultAt(i)).toJson());
                }
            }

            JsonObject res = new JsonObject()
                .put("data", data);

            routingContext.response()
                    .putHeader("Content-Type", "application/json")
                    .end(res.toString());
        });

but with that I get the following problems:

  • Using CompositeFuture.all(futures).onComplete(), I don't get the results of a succeeded Future as soon as any Future out of futures fails (because then ar.result() is null).
  • Using CompositeFuture.any(futures).onComplete(), I would get all results, but the CompositeFuture completes before all Futures of futures are completed. Meaning, it does not wait for every Future to complete, but completes as soon as any Future is completed. (-> cf.resultAt(i) returns null)
  • Using CompositeFuture.join(futures).onComplete(), it's the same as with all(): ar.result() is null as soon as any Future fails.

What is the correct/best way to wait for a list of Futures to complete, while being able to then handle each result and outcome individually?

CodePudding user response:

The easiest is if you are handling the results by yourself. You can register a onSuccess handler to your futures. This way the results will be put in some kind of list, e.g. JsonArray.

List<Future> futures = //...list of futures

JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));

CompositeFuture.all(futures)
    .onComplete(ar -> {
        if(ar.failed()) {
            // successful elements are present in "results"
            routingContext.response().end(results.encode());
            return;
        }
        //... rest of your code
     });

You can also look into the rx-java library. Such use cases are usually better implemented using it.

CodePudding user response:

You can simply poke the original futures for it's results when using all:

List<Future> futures = //...list of futures

CompositeFuture.all(futures).onComplete(ar -> {
  if(ar.succeeded()){
    futures.forEach(fut -> log.info( fut.succeded()  " / "  _fut.result() ));
  }
} );
  • Related