Home > Software design >  How to Obtain the Exception Outside of CompletableFuture Handler?
How to Obtain the Exception Outside of CompletableFuture Handler?

Time:02-25

I have the following situation where I'm trying to see if there is a solution for:

  • Two Spring service calls must be made in parallel (one is an existing service call/logic and the second is the new addition).
  • The results should then be merged and returned by the RESTful API.

A happy path should be straightforward, however, when it comes to errors emitted by the services the following rule should adhere to:

  • The API fails only when both service calls fail -- this should be thrown from the main thread and not the @Async pool since they are independent threads and don't have access to each other's exception (at least that's my reasoning).

  • If only one of them fails, log the error through another service (asynchronously), and the API returns only the results from a service that was successful -- this can be done from the respective @Async threads.

    @Service
    public class Serv1 interface ServInf {
     @Async("customPool")
     public CompletableFuture<List<Obj>> getSomething(int id) {
       // The service ensures that the list is never null, but it can be empty
       return CompletableFuture.completedFuture(/* calling an external RESTful API */);
     }
    }
    
    @Service
    public class Serv2 interface ServInf {
     @Async("customPool")
     public CompletableFuture<List<Obj>> getSomething(int id) {
       // The service ensures that the list is never null, but it can be empty
       return CompletableFuture.completedFuture(/* calling another external RESTful API */);
         }
     }
    
    @RestController
    public class MyController {
    
     /** Typical service @Autowired's */
    
     @GetMapping(/* ... */)
     public WrapperObj getById(String id) {
    
         CompletableFuture<List<String>> service1Result =
                 service1.getSomething(id)
                         .thenApply(result -> {
                             if (result == null) { return null; }
                             return result.stream().map(Obj::getName).collect(Collectors.toList());
                         })
                         .handle((result, exception) -> {
                             if (exception != null) {
                                 // Call another asynchronous logging service which should be easy
                                 return null;
                             } else {
                                 return result;
                             }
                         });
    
         CompletableFuture<List<String>> service2Result =
                 service2.getSomething(id)
                         .thenApply(result -> {
                             if (result == null) { return null; }
                             return result.stream().map(Obj::getName).collect(Collectors.toList());
                         })
                         .handle((result, exception) -> {
                             if (exception != null) {
                                 // Call another asynchronous logging service which should be easy
                                 return null;
                             } else {
                                 return result;
                             }
                         });
    
         // Blocking till we get the results from both services
         List<String> result1 = service1Result.get();
         List<String> result2 = service2Result.get();
    
         /** Where to get the exceptions thrown by the services if both fail
         if (result1 == null && result2 == null) {
             /** Signal that the API needs to fail as a whole */
             throw new CustomException( /** where to get the messages? */);
         }
    
         /** merge and return the result */
     }
    }
    

My question is, Since these services return a list of some object, even if I use CompletableFuture.handle() and check for existence of an exception, I can't return the Exception itself in order to capture and let Spring Advice class handle it (chained to return a list).

One thing I thought of is to use AtomicReference in order to capture the exceptions and set them within the handle() and use them once the futures are done/complete, e.g.

AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();

.handle((result, exception) -> {
    if (exception != null) {
        ce1.set(exception);
        return null; // This signals that there was a failure
    } else {
        return result;
    }
});

List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();

/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
    /** Signal that the API needs to fail as a whole */
    throw new CustomException(/** do logic to capture ce1.get().getMessage()   ce2.get().getMessage() */);
}

First, does this sound like a viable solution in this multi-threaded asynchronous calls?

Second, this looks messy, so I was wondering if there is a more elegant way of capturing these exceptions outside of Spring async pool, and deal with it in the main thread, e.g. combine the exception information and throw it to Spring Advice exception handler.

CodePudding user response:

CompletableFutures are quite cumbersome to deal with, but here would be a more functional and reactive approach IMO.

We'll need that sequence method from https://stackoverflow.com/a/30026710/1225328:

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

Then, I'm using Optional to represent the status of the operations, but a Try monad would suit better (so use one if you have such a utility in your codebase - Java doesn't bring one natively yet):

CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});

Now wait for the two futures and handle the results once available:

CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
    Optional<List<Object>> result1 = results.get(0);
    Optional<List<Object>> result2 = results.get(1);
    if (result1.isEmpty() && result2.isEmpty()) {
        throw new CustomException(...);
    }
    // https://stackoverflow.com/a/18687790/1225328:
    return Stream.of(
            result1.map(Stream::of).orElseGet(Stream::empty),
            result2.map(Stream::of).orElseGet(Stream::empty)
    ).collect(Collectors.toList());
});

Then you would ideally return mergedResults directly and let the framework deal with it for you so that you don't block any thread, or you can .get() on it (which will block the thread), which will throw an ExecutionException if your CustomException (or any other exception) is thrown (accessible in e.getCause()).


This would look simpler with Project Reactor (or equivalent), in case you're using it already, but the idea would be roughly the same.

CodePudding user response:

Assuming two futures

CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …

a straight-forward approach to combine the two futures is

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

but this future will fail if either future fails.

To fail only when both futures failed and construct a new exception from both throwable, we can define two utility methods:

private static Throwable getThrowable(CompletableFuture<?> f) {
    return f.<Throwable>thenApply(value -> null)
            .exceptionally(throwable -> throwable).join();
}

private static <T> T throwCustom(Throwable t1, Throwable t2) {
    throw new CustomException(t1.getMessage()   " and "   t2.getMessage());
}

The method getThrowable is intended to be used with a future already known to be completed exceptionally. We could call join and catch the exception, but as show above, we can also turn transform the future to a non-exceptional future containing the throwable as its value.

Then, we can combine all of the above to

CompletableFuture<List<String>> failOnlyWhenBothFailed = both
    .thenApply(list -> both)
    .exceptionally(t ->
        !service1Result.isCompletedExceptionally()? service1Result:
        !service2Result.isCompletedExceptionally()? service2Result:
        throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
    .thenCompose(Function.identity());

Within the function passed to exceptionally, the incoming futures are already known to be completed, so we can use the utility methods to extract the throwables and throw a new exception.

The advantage of this is that the resulting construction is non-blocking.

But in your case, you want to wait for the completion rather than returning a future, so we can simplify the operation:

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

both.exceptionally(t -> null).join();

if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
  Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
  throw new CustomException(t1.getMessage()   " and "   t2.getMessage());
}

List<String> result = (
    service1Result.isCompletedExceptionally()? service2Result:
    service2Result.isCompletedExceptionally()? service1Result: both
).join();

By using both.exceptionally(t -> null).join(); we wait for the completion of both jobs, without throwing an exception on failures. After this statement, we can safely use isCompletedExceptionally() to check the futures we know to be completed.

So if both failed, we extract the throwables and throw our custom exception, otherwise, we check which task(s) succeeded and extract the result of either or both.

  • Related