I am trying to refactor code that sequentially waits on multiple futures to complete, to instead jointly wait for completion.
So I try to wait on multiple futures with a single timeout by using
// Example outcomes
final CompletableFuture<String> completedFuture
= CompletableFuture.completedFuture("hello");
final CompletableFuture<String> failedFuture
= new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Test Stub Exception"));
final CompletableFuture<String> incompleteFuture
= new CompletableFuture<>();
final AtomicBoolean timeoutHandled = new AtomicBoolean(false);
final CompletableFuture<String> checkedFuture
= incompleteFuture.whenComplete(
(x, e) -> timeoutHandled.set(e instanceof TimeoutException));
// this example timeouts after 1ms
try {
CompletableFuture
.allOf(completedFuture, checkedFuture, failedFuture)
.get(1, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
// probably do something here?
}
// but the incomplete future is still pending
assertTrue(checkedFuture.isCompletedExceptionally());
// this still fails even if checkedFuture.completeExceptionally(e) is called
assertTrue(timeoutHandled.get());
However the assert above fails because while the collective future timed out, the individual future did not time out yet. I would like to cancel such individual futures the same way as if they had run into timeouts individually, because they might have individual whenComplete() handlers handling TimeoutExceptions:
Expecting
<CompletableFuture[Incomplete]>
to be completed exceptionally.
Is there a useful/safe pattern by which I can loop over all exceptions and invoke completeExceptionally() to simulate a timeout in each of the futures, and make sure all "exception handlers" have been invoked before moving on?
CodePudding user response:
You can create a varargs method with your try/catch that loops through each CompletableFuture and invokes completeExceptionally().
static void completeFutures(CompletableFuture<?>... completableFutures) throws ExecutionException {
try {
CompletableFuture.allOf(completableFutures).get(1, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
for (CompletableFuture<?> cf : completableFutures) {
cf.completeExceptionally(e);
}
}
}