This is a simplified example I did to expose my problem. I have some task doSomeWork()
that I handle in a multihreading fashion using ExecutorService (4 threads at a time max). However, if any of the threads/tasks generates an exception, I would like to:
Stop any further tasks from being processed.
Catch the exception at the main thread level.
public static void main(String[] args) { final ExecutorService threadPool = Executors.newFixedThreadPool(4); final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(threadPool); try { for (int i = 0; i < 10; i ) { int b = i; completionService.submit(() -> doSomeWork(b)); } threadPool.shutdown(); threadPool.awaitTermination(8, TimeUnit.HOURS); System.exit(0); } catch (Exception e) { System.out.println("Something wrong happened: " e.getMessage()); } System.exit(1); } //This function have 50% odds of throwing an exception public static Void doSomeWork(int i) throws Exception { Thread.sleep(500); if ((Math.random() > 0.5)) { System.out.println("I have reached indice: " i); } else { throw new Exception("I couldn't handle indice " i); } return null; }
Currently, an execution would output something like this:
I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
I have reached indice: 4
I have reached indice: 6
I have reached indice: 7
I have reached indice: 5
I have reached indice: 9
As you can see, indice 3
is missing, nevertheless the execution of the remaining threads completed. It also didn't output anything about the exception.
My desired output would be something like this:
I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
Something wrong happened: I couldn't handle indice 3
Other solutions I found around this problem were using a callable with a future but in a blocking fashion. I can't block the execution of other threads while waiting for the future otherwise this whole multithreading is pointless.
CodePudding user response:
You can do that using CompletableFuture
. This is your main function I tested:
final ExecutorService executorService = Executors.newFixedThreadPool(4);
final List<CompletableFuture<Void>> all = new ArrayList<>();
try {
for (int i = 0; i < 10; i ) {
int b = i;
CompletableFuture<Void> v = CompletableFuture.runAsync(() -> {
try {
doSomeWork(b);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
executorService);
all.add(v);
}
CompletableFuture<Void> placeholder = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
failFast(all, placeholder);
System.out.println("All tasks ended");
} catch (Exception e) {
System.out.println("Something wrong happened: " e.getMessage());
} finally {
executorService.shutdownNow();
}
Utility function to make the joint future fail as soon as one of them is failed (or when all of them are completed):
private static <T> void failFast(List<CompletableFuture<T>> futures, CompletableFuture<T> joint) {
while (true) {
if (joint.isDone()) {
return;
}
for (CompletableFuture<T> future : futures) {
if (future.isCompletedExceptionally()) {
return;
}
}
}
}
And this is the output I get:
I have reached indice: 1
I have reached indice: 7
I have reached indice: 5
I have reached indice: 4
Something wrong happened: java.lang.RuntimeException: java.lang.Exception: I couldn't handle indice 0
Explanation:
The method CompletableFuture.runAsync()
allows you to provide with a Runnable
(your doSomeWork
) and an executor with a certain number of threads. Here, I pass an executor with 4 threads (as you did in your example).
Inside the runnable, I don't only run the doSomeWork
function but I also catch Exception
and throw a RuntimeException
(need to do that because Lambdas do not support checked exceptions, so I need to wrap it into a runtime one but it will still interrupt execution and be catched by your main).
Each time I create a new CompletableFuture<Void>
for the task with the given index i
, I will store this result into a list of completable futures.
The for loop will take nothing to execute, since the completable futures run asynchronously.
Hence, I create a joint completable future with CompletableFuture.allOf(...)
and then I use the utility function failFast
on this future in order to stop as soon as one of the task is failed (or to continue until all of them are complete).
So basically as soon as one of the futures fails throwing an exception, the joint future is considered to be completed and will hence leave the handle to your main thread which is, meanwhile, being hit by the RuntimeException
that was thrown inside the lambda expression.
Note: thanks to Thomas' comment, I've updated the code to use an ExecutorService
instead of a simple Executor
. That allows you to have a call to shutdownNow()
inside your finally
block after you catch the exception.
As Thomas suggests, also, you may directly throw a RuntimeException
inside your doSomeWork
function so you don't need to catch and wrap inside the lambda expression.
Other note: @matt made me notice something I didn't know. The .allOf()
future will be completed when ALL futures are completed, whether successfully or not.
Hence, as he pointed out, my solution wouldn't work as is. I've edited again the answer to take his comment into account, thanks @matt for making me notice.
CodePudding user response:
It sounds like you've excluded a correct way to do this, based on an incorrect assumption. Keep your futures.
List<Future<?>> futures = new ArrayList<>();
Then when you submit.
futures.add( completionService.submit( () -> doSomeWork(b) ) );
Now, you can check the futures status in your main thread.
for(Future<?> f: futures){
try{
f.get();
} catch( ExecutionException e){
//execution exception handled on the main thread.
completionService.shutdownNow();
} catch( InterruptedException ie){
//what should happen here.
}
}
That way, shutdownNow gets called, so all of the non-started tasks get returned and will not start.
You could use a timeout with get
to check every task since some will be running in parallel.
Here is a complete compilable example.
import java.util.concurrent.*;
import java.util.*;
public class ExecutorJunk{
static int count = 0;
static void task(){
int z = count ;
if(z == 3){
throw new RuntimeException("z is 3");
}
System.out.println("z: " z);
try{ Thread.sleep(1500);} catch(InterruptedException e){};
}
public static void main(String[] args){
ExecutorService service = Executors.newFixedThreadPool(4);
List<Future<?>> all = new ArrayList<>();
for(int i = 0; i<10; i ){
all.add( service.submit(ExecutorJunk::task) );
}
service.shutdown();
try{
while(!service.isTerminated()){
for(Future f: all){
try{
f.get(1, TimeUnit.MILLISECONDS);
} catch( TimeoutException toe){
//pass.
}
}
}
} catch(Exception e){
System.out.println( service.shutdownNow().size() " tasks not started");
e.printStackTrace();
}
}
}
When I run this I get.
z: 0
z: 1
z: 2
z: 4
5 tasks not started
java.util.concurrent.ExecutionException: java.lang.RuntimeException: z is 3
...
It probably could be done a bit smarter. Such as purging the list of futures as get succeeds, instead of using a timeout just check the futures if they're done, then doing a future.get.