I am trying to make 3 rest calls using completablefutures and return for the first one that matches a specific response. Below is sample test code I wrote (Minus the rest calls) for it but that does not seem to work. I always see "future1" getting returned even with the wait time, which means, test2 and test3 are blocking. How do I achieve the ask?
I thought of using CompletableFuture.anyOf but that just returns the result for the first future that gets executed. Not the first one that matches a specified response. Please advise
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Testing {
public static void main(String args[]) throws InterruptedException, ExecutionException {
CompletableFuture<String> combinedFuture = test("future1", 10000)
.thenCompose(response1 -> test2()
.thenCompose(response2 -> test3()
.thenApply(response3 -> {
return combine(response1, response2, response3);
})));
System.out.println(combinedFuture.get());
}
private static CompletableFuture<String> test(String str, int i) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
});
}
private static CompletableFuture<String> test2() {
return test("future2", 0);
}
private static CompletableFuture<String> test3() {
return test("future3", 0);
}
private static String combine(String response1, String response2, String response3) {
String responseString = null;
if (response1 != null) {
return response1;
} else if (response2 != null) {
return response2;
} else if (response3 != null) {
return response3;
}
return responseString;
}
}
CodePudding user response:
You need to have a specific task that will collect the result of the others and complete only when desired.
For instance:
public class Testing {
public static void main(String args[]) throws InterruptedException, ExecutionException {
String result = aggregator(
Arrays.asList(
test("future1", 10000),
test("future2", 0),
test("future3", 0)),
(value) -> { return value != null; },
"default value"
).get();
System.out.println(result);
}
private static CompletableFuture<String> aggregator(Collection<CompletableFuture<String>> tasks, Predicate<String> validator, String defaultValue)
{
CompletableFuture<String> a = new CompletableFuture<String>();
AtomicInteger count = new AtomicInteger(0);
tasks.forEach((t) -> {
t.whenComplete((value, error) -> {
int c = count.incrementAndGet();
if( error == null && validator.test(value) ) a.complete(value);
else if( c == tasks.size() ) a.complete(defaultValue);
});
});
return a;
}
private static CompletableFuture<String> test(String str, int i) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
});
}
}
The aggregator
method will accept any number of other tasks and will compare the result of each using the validator
provided. The first one that matches is returned immediately without waiting for the others.
And at the end, if none matched, it completes with the default value.
CodePudding user response:
You can race the futures against each other and delegate completions to another one:
static <T> CompletableFuture<T> first(Stream<CompletableFuture<T>> futures) {
var delegate = new CompletableFuture<T>();
runAsync(() ->
futures.forEach(future ->
future.handle(
(value, error) -> {
if (value == null) {
return delegate.completeExceptionally(error);
} else {
return delegate.complete(value);
}
})));
return delegate;
}
The future returned by first
completes (either successfully or with an error), whenever the first one passed in the futures
argument completes.
Now
CompletableFuture<String> combinedFuture =
first(Stream.of(test("future1", 10000), test2(), test3()));
System.out.println(combinedFuture.get());
prints either "future2" or "future3" depending on which on happens to complete first.