Home > database >  Return the future that gets executed first with a specific condition on the response
Return the future that gets executed first with a specific condition on the response

Time:08-21

I am trying to make 3 rest calls using completablefutures and return for the first one that matches a specific response. Below is sample test code I wrote (Minus the rest calls) for it but that does not seem to work. I always see "future1" getting returned even with the wait time, which means, test2 and test3 are blocking. How do I achieve the ask?

I thought of using CompletableFuture.anyOf but that just returns the result for the first future that gets executed. Not the first one that matches a specified response. Please advise

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Testing {
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        CompletableFuture<String> combinedFuture = test("future1", 10000)
                .thenCompose(response1 -> test2()
                        .thenCompose(response2 -> test3()
                                .thenApply(response3 -> {
                                    return combine(response1, response2, response3);
                                })));
        System.out.println(combinedFuture.get());
    }

    private static CompletableFuture<String> test(String str, int i) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return str;
        });
    }

    private static CompletableFuture<String> test2() {
        return test("future2", 0);
    }

    private static CompletableFuture<String> test3() {
        return test("future3", 0);
    }

    private static String combine(String response1, String response2, String response3) {
        String responseString = null;
        if (response1 != null) {
            return response1;
        } else if (response2 != null) {
            return response2;
        } else if (response3 != null) {
            return response3;
        }
        return responseString;
    }

}

CodePudding user response:

You need to have a specific task that will collect the result of the others and complete only when desired.

For instance:

public class Testing {
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        
        String result = aggregator(
                Arrays.asList(
                        test("future1", 10000),
                        test("future2", 0),
                        test("future3", 0)),
                (value) -> { return value != null; },
                "default value"
                ).get();
        
        System.out.println(result);
    }
    
    private static CompletableFuture<String> aggregator(Collection<CompletableFuture<String>> tasks, Predicate<String> validator, String defaultValue)
    {
        CompletableFuture<String> a = new CompletableFuture<String>();
        AtomicInteger count = new AtomicInteger(0);
        
        tasks.forEach((t) -> {
            t.whenComplete((value, error) -> { 
                int c = count.incrementAndGet();
                if( error == null && validator.test(value) ) a.complete(value);
                else if( c == tasks.size() ) a.complete(defaultValue);
            }); 
        }); 
        
        return a;
    }

    private static CompletableFuture<String> test(String str, int i) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return str;
        });
    }
}

The aggregator method will accept any number of other tasks and will compare the result of each using the validator provided. The first one that matches is returned immediately without waiting for the others.

And at the end, if none matched, it completes with the default value.

CodePudding user response:

You can race the futures against each other and delegate completions to another one:

static <T> CompletableFuture<T> first(Stream<CompletableFuture<T>> futures) {
    var delegate = new CompletableFuture<T>();

    runAsync(() ->
         futures.forEach(future ->
             future.handle(
                 (value, error) -> {
                     if (value == null) {
                         return delegate.completeExceptionally(error);
                     } else {
                         return delegate.complete(value);
                     }
                 })));

    return delegate;
}

The future returned by first completes (either successfully or with an error), whenever the first one passed in the futures argument completes.

Now

CompletableFuture<String>  combinedFuture =
    first(Stream.of(test("future1", 10000), test2(), test3()));

System.out.println(combinedFuture.get());

prints either "future2" or "future3" depending on which on happens to complete first.

  • Related