Home > Mobile >  Multi-threaded list manipulation
Multi-threaded list manipulation

Time:11-04

Can multiple threads work with values from the same list at the same time? For example, we have 3 threads and a list of tasks that they have to perform:

ExecutorService executor = Executors.newFixedThreadPool(3);
List<Strng> tasks = Arrays.asList("firstTask", "secondTask", "thirdTask", "foiuthTask", "fifthTask");

The threads started their work and took the first three tasks, but the second thread completed the task the fastest. Now it needs to take the 4th task to work, if it is not occupied by another thread and etc. Is this implementation possible? I can't find any information about this.

I tryid to work with parallel Streams, but always got wrong results.

CodePudding user response:

You should submit the tasks to the ExecutorService and it will handle dispatching them to any available threads.

executor.submit(task)

You can also use a BlockingQueue instead of a list if you want to do this outside of an Executor as this is how the ExecutorService work internally.

CodePudding user response:

I would pass each of those input strings to an instance of a Runnable task. Then feed the tasks to an executor service.

Note that a Runnable has no result to return. A Callable does. Our example task here has no result, so it can be defined as a Runnable. However, the convenient ExecutorService#invokeAll method takes only Callable objects. So we use the Executors.callable method to make a Callable object of each of our Runnable objects.

Beware: Output sent to System.out across threads does not necessarily appear on the console in chronological order. Always include a timestamp such as Instant.now(). If you care about sequence, study those timestamps. Or else use a different reporting mechanism other than System.out.

Be sure to properly shutdown your executor service. See boilerplate code in the Javadoc, in shutdownAndAwaitTermination method. I slightly modified that code here. Notice that ExecutorService#awaitTermination blocks until all submitted tasks complete.

In future features being previewed now in Java 20, Project Loom may bring another way to execute a group of tasks using structured concurrency. If curious, see relevant Java JEPs 425 & 428.

package work.basil.example.tsk;

import java.time.*;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        ExecutorService executorService = Executors.newFixedThreadPool( 3 );
        List < Future < Object > > futures;
        try
        {
            futures = executorService.invokeAll( this.getTasks() );
        }
        catch ( InterruptedException e )
        {
            throw new RuntimeException( e );
        }
        this.shutdownAndAwaitTermination( executorService );
    }

    private Collection < Callable < Object > > getTasks ( )
    {
        List < String > inputs = List.of( "firstTask" , "secondTask" , "thirdTask" , "foiuthTask" , "fifthTask" );

        List < Callable < Object > > tasks =
                inputs
                        .stream()
                        .map( input -> new StringProcessor( input ) )  // Instantiate a task for each input, a `StringProcessor` implementing `Runnable` interface.
                        .map( Executors :: callable ) // Make `Callable` objects from our `Runnable` objects, to appease the  `ExecutorService#invokeAll` method.
                        .toList();
        return tasks;
    }

    void shutdownAndAwaitTermination ( ExecutorService executorService )
    {
        executorService.shutdown(); // Disable new tasks from being submitted
        try
        {
            // Wait a while for existing tasks to terminate
            if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
            {
                executorService.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
                { System.err.println( "Executor service did not terminate. "   Instant.now() ); }
            }
        }
        catch ( InterruptedException ex )
        {
            // (Re-)Cancel if current thread also interrupted
            executorService.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}


class StringProcessor implements Runnable
{
    String input;

    public StringProcessor ( final String input )
    {
        this.input = input;
    }

    @Override
    public void run ( )
    {
        System.out.println( "input = "   input   " processed at "   Instant.now() );
    }
}

When run:

input = thirdTask processed at 2022-11-03T20:02:22.543733Z
input = firstTask processed at 2022-11-03T20:02:22.543782Z
input = foiuthTask processed at 2022-11-03T20:02:22.555666Z
input = fifthTask processed at 2022-11-03T20:02:22.555675Z
input = secondTask processed at 2022-11-03T20:02:22.543744Z
  • Related