I have a task that will run many times with different values. I'd like to prevent it from executing 2 of the same tasks (Based on the string value) at the same time. Below is an example of the strings. These values will change, but for simplicity I have included these values below in the example. I submit these tasks via an ExecutorService
The tasks run, but the 2nd hi blocks the other tasks from running. So 4/5 tasks run concurrently. Once the lock is released from the first hi the 5th tasks continues and the other tasks continue fine. Is there a way to prevent this type of blocking of the task so that the other 3 tasks can run before it so there is no queuing until there is actually 5 tasks running concurrently.
Submission of the tasks:
executor.submit(new Task("hi"));
executor.submit(new Task("h"));
executor.submit(new Task("u"));
executor.submit(new Task("y"));
executor.submit(new Task("hi"));
executor.submit(new Task("p"));
executor.submit(new Task("o"));
executor.submit(new Task("bb"));
The Task is simple. It just prints out the string:
Lock l = getLock(x);
try {
l.lock();
System.out.println(x);
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Logger.getLogger(Task.class.getName()).log(Level.SEVERE, null, ex);
}
} finally {
l.unlock();
}
I've updated the post to allow for things to be more clearly understood...
CodePudding user response:
To avoid blocking a thread, you have to ensure that the action doesn’t even run before the other. For example, you can use a CompletableFuture
to chain an action, to be scheduled when the previous has been completed:
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
for(int i = 0; i < 5; i ) submit("one", task("one"), es);
for(int i = 0; i < 5; i ) submit("two", task("two"), es);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(26));
es.shutdown();
}
static Runnable task(String x) {
return () -> {
System.out.println(x);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
};
}
static final ConcurrentHashMap<String, CompletableFuture<Void>> MAP
= new ConcurrentHashMap<>();
static final void submit(String key, Runnable task, Executor e) {
CompletableFuture<Void> job = MAP.compute(key,
(k, previous) -> previous != null?
previous.thenRunAsync(task, e): CompletableFuture.runAsync(task, e));
job.whenComplete((v,t) -> MAP.remove(key, job));
}
The ConcurrentHashMap
allows us to handle the cases as atomic updates
If no previous future exists for a key, just schedule the action, creating the future
If a previous future exists, chain the action, to be scheduled when the previous completed; the dependent action becomes the new future
If a job completed, the two-arg
remove(key, job)
will remove it if and only if it is still the current job
The example in the main
method demonstrates how two independent actions can run with a thread pool of two threads, never blocking at thread.