In a Vert.x web server, I have a set of Futures, which each one could either fail or succeed and hold a result. I am interested in the outcome (and possibly the result) of each and every one of those Futures, meaning I need to process the result of each Future.
I was thinking that Vert.x's CompositeFuture
was the way to go, this is my code snippet:
List<Future> futures = dataProviders.stream()
.filter(dp -> dp.isActive(requester))
.map(DataProvider::getData)
.collect(Collectors.toList());
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
routingContext.response()
.end(ar.cause());
return;
}
CompositeFuture cf = ar.result();
JsonArray data = new JsonArray();
for(int i = 0; i < cf.size(); i ) {
if(cf.failed(i)) {
final JsonObject errorJson = new JsonObject();
errorJson.put("error", cf.cause(i).getMessage());
data.add(errorJson);
} else {
data.add(((Data) cf.resultAt(i)).toJson());
}
}
JsonObject res = new JsonObject()
.put("data", data);
routingContext.response()
.putHeader("Content-Type", "application/json")
.end(res.toString());
});
but with that I get the following problems:
- Using
CompositeFuture.all(futures).onComplete()
, I don't get the results of a succeeded Future as soon as any Future out offutures
fails (because thenar.result()
is null). - Using
CompositeFuture.any(futures).onComplete()
, I would get all results, but the CompositeFuture completes before all Futures offutures
are completed. Meaning, it does not wait for every Future to complete, but completes as soon as any Future is completed. (->cf.resultAt(i)
returns null) - Using
CompositeFuture.join(futures).onComplete()
, it's the same as withall()
:ar.result()
is null as soon as any Future fails.
What is the correct/best way to wait for a list of Futures to complete, while being able to then handle each result and outcome individually?
CodePudding user response:
The easiest is if you are handling the results by yourself. You can register a onSuccess
handler to your futures. This way the results will be put in some kind of list, e.g. JsonArray
.
List<Future> futures = //...list of futures
JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
// successful elements are present in "results"
routingContext.response().end(results.encode());
return;
}
//... rest of your code
});
You can also look into the rx-java
library. Such use cases are usually better implemented using it.
CodePudding user response:
You can simply poke the original futures for it's results when using all
:
List<Future> futures = //...list of futures
CompositeFuture.all(futures).onComplete(ar -> {
if(ar.succeeded()){
futures.forEach(fut -> log.info( fut.succeded() " / " _fut.result() ));
}
} );