Home > Software design >  Executor service not waiting until all threads get completed to return the result
Executor service not waiting until all threads get completed to return the result

Time:07-22

I am using an Executor service to execute set of parallel tasks and I need one done something after all the tasks get completed. But in my below implementation it is not happening as expected and it is not waiting till all done. Please refer to the code snippet below.

public void doParallelStuff() throws InterruptedException {
        
        final int THREAD_COUNT = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        Set<String> myIdSet = Set.of("123", "234", "345", "897", "893"); // actual set is having 20k  IDs
        CountDownLatch countDownLatch = new CountDownLatch(myIdSet.size());

        myIdSet.forEach(id -> executorService.execute(() -> {
            // retrieve & do processing the entity related to id
            countDownLatch.countDown();
        }));

        countDownLatch.await(10L, TimeUnit.SECONDS);
        executorService.shutdown();

        // do some other stuff after all completed
        // (but this point is getting executed before the all tasks get completed)
    }

CodePudding user response:

Lose the CountDownLatch

You don't need the CountDownLatch. An executor service provides that feature, built-in.

shutdown & awaitTermination in ExecutorService

The key is making two calls:

  • ExecutorService#shutdown to stop the executor service from accepting more task submissions.
  • ExecutorService#awaitTermination to block and wait for all submitted tasks to finish.

Here is the main chunk of code excerpted from a full example app posted further down.

First we generate some fake input. And we define a thread-safe list to hold Result objects, with each submitted task instantiated and collecting a fresh Result object.

Set < Integer > ids = IntStream.range( 1 , 20_001 ).boxed().collect( Collectors.toSet() );
List < Result > results = Collections.synchronizedList( new ArrayList <>() );

Next we define a collection to hold our pending tasks. We populate with many Task objects.

List < Task > tasks = new ArrayList <>( ids.size() );
for ( Integer id : ids ) tasks.add( new Task( id , results ) );

We create an executor service with a reasonable number of threads, as appropriate to our deployment machine.

ExecutorService executorService = Executors.newFixedThreadPool( 7 );

Next we define a collection to keep all the Future objects returned by the executor service when we submit all our tasks.

List < Future < Boolean > > futures = null;

Submit all the tasks, capturing the futures.

try { futures = executorService.invokeAll( tasks , 5 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }

  • Related