I am using an Executor service to execute set of parallel tasks and I need one done something after all the tasks get completed. But in my below implementation it is not happening as expected and it is not waiting till all done. Please refer to the code snippet below.
public void doParallelStuff() throws InterruptedException {
final int THREAD_COUNT = 5;
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
Set<String> myIdSet = Set.of("123", "234", "345", "897", "893"); // actual set is having 20k IDs
CountDownLatch countDownLatch = new CountDownLatch(myIdSet.size());
myIdSet.forEach(id -> executorService.execute(() -> {
// retrieve & do processing the entity related to id
countDownLatch.countDown();
}));
countDownLatch.await(10L, TimeUnit.SECONDS);
executorService.shutdown();
// do some other stuff after all completed
// (but this point is getting executed before the all tasks get completed)
}
CodePudding user response:
Lose the CountDownLatch
You don't need the CountDownLatch
. An executor service provides that feature, built-in.
shutdown
& awaitTermination
in ExecutorService
The key is making two calls:
ExecutorService#shutdown
to stop the executor service from accepting more task submissions.ExecutorService#awaitTermination
to block and wait for all submitted tasks to finish.
Here is the main chunk of code excerpted from a full example app posted further down.
First we generate some fake input. And we define a thread-safe list to hold Result
objects, with each submitted task instantiated and collecting a fresh Result
object.
Set < Integer > ids = IntStream.range( 1 , 20_001 ).boxed().collect( Collectors.toSet() );
List < Result > results = Collections.synchronizedList( new ArrayList <>() );
Next we define a collection to hold our pending tasks. We populate with many Task
objects.
List < Task > tasks = new ArrayList <>( ids.size() );
for ( Integer id : ids ) tasks.add( new Task( id , results ) );
We create an executor service with a reasonable number of threads, as appropriate to our deployment machine.
ExecutorService executorService = Executors.newFixedThreadPool( 7 );
Next we define a collection to keep all the Future
objects returned by the executor service when we submit all our tasks.
List < Future < Boolean > > futures = null;
Submit all the tasks, capturing the futures.
try { futures = executorService.invokeAll( tasks , 5 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }