Can multiple threads work with values from the same list at the same time? For example, we have 3 threads and a list of tasks that they have to perform:
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Strng> tasks = Arrays.asList("firstTask", "secondTask", "thirdTask", "foiuthTask", "fifthTask");
The threads started their work and took the first three tasks, but the second thread completed the task the fastest. Now it needs to take the 4th task to work, if it is not occupied by another thread and etc. Is this implementation possible? I can't find any information about this.
I tryid to work with parallel Streams, but always got wrong results.
CodePudding user response:
You should submit the tasks to the ExecutorService and it will handle dispatching them to any available threads.
executor.submit(task)
You can also use a BlockingQueue
instead of a list if you want to do this outside of an Executor as this is how the ExecutorService work internally.
CodePudding user response:
I would pass each of those input strings to an instance of a Runnable
task. Then feed the tasks to an executor service.
Note that a Runnable
has no result to return. A Callable
does. Our example task here has no result, so it can be defined as a Runnable
. However, the convenient ExecutorService#invokeAll
method takes only Callable
objects. So we use the Executors.callable
method to make a Callable
object of each of our Runnable
objects.
Beware: Output sent to System.out
across threads does not necessarily appear on the console in chronological order. Always include a timestamp such as Instant.now()
. If you care about sequence, study those timestamps. Or else use a different reporting mechanism other than System.out
.
Be sure to properly shutdown your executor service. See boilerplate code in the Javadoc, in shutdownAndAwaitTermination
method. I slightly modified that code here. Notice that ExecutorService#awaitTermination
blocks until all submitted tasks complete.
In future features being previewed now in Java 20, Project Loom may bring another way to execute a group of tasks using structured concurrency. If curious, see relevant Java JEPs 425 & 428.
package work.basil.example.tsk;
import java.time.*;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
ExecutorService executorService = Executors.newFixedThreadPool( 3 );
List < Future < Object > > futures;
try
{
futures = executorService.invokeAll( this.getTasks() );
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
this.shutdownAndAwaitTermination( executorService );
}
private Collection < Callable < Object > > getTasks ( )
{
List < String > inputs = List.of( "firstTask" , "secondTask" , "thirdTask" , "foiuthTask" , "fifthTask" );
List < Callable < Object > > tasks =
inputs
.stream()
.map( input -> new StringProcessor( input ) ) // Instantiate a task for each input, a `StringProcessor` implementing `Runnable` interface.
.map( Executors :: callable ) // Make `Callable` objects from our `Runnable` objects, to appease the `ExecutorService#invokeAll` method.
.toList();
return tasks;
}
void shutdownAndAwaitTermination ( ExecutorService executorService )
{
executorService.shutdown(); // Disable new tasks from being submitted
try
{
// Wait a while for existing tasks to terminate
if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
{
executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
{ System.err.println( "Executor service did not terminate. " Instant.now() ); }
}
}
catch ( InterruptedException ex )
{
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
class StringProcessor implements Runnable
{
String input;
public StringProcessor ( final String input )
{
this.input = input;
}
@Override
public void run ( )
{
System.out.println( "input = " input " processed at " Instant.now() );
}
}
When run:
input = thirdTask processed at 2022-11-03T20:02:22.543733Z
input = firstTask processed at 2022-11-03T20:02:22.543782Z
input = foiuthTask processed at 2022-11-03T20:02:22.555666Z
input = fifthTask processed at 2022-11-03T20:02:22.555675Z
input = secondTask processed at 2022-11-03T20:02:22.543744Z