Home > Blockchain >  Prevent thread blocking queued threads
Prevent thread blocking queued threads

Time:01-10

I have a task that will run many times with different values. I'd like to prevent it from executing 2 of the same tasks (Based on the string value) at the same time. Below is an example of the strings. These values will change, but for simplicity I have included these values below in the example. I submit these tasks via an ExecutorService The tasks run, but the 2nd hi blocks the other tasks from running. So 4/5 tasks run concurrently. Once the lock is released from the first hi the 5th tasks continues and the other tasks continue fine. Is there a way to prevent this type of blocking of the task so that the other 3 tasks can run before it so there is no queuing until there is actually 5 tasks running concurrently.

Submission of the tasks:

executor.submit(new Task("hi"));
executor.submit(new Task("h"));
executor.submit(new Task("u"));
executor.submit(new Task("y"));
executor.submit(new Task("hi"));
executor.submit(new Task("p"));
executor.submit(new Task("o"));
executor.submit(new Task("bb"));

The Task is simple. It just prints out the string:

Lock l = getLock(x);
try {
l.lock();

System.out.println(x);

try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Logger.getLogger(Task.class.getName()).log(Level.SEVERE, null, ex);
}

} finally {
l.unlock();

}

I've updated the post to allow for things to be more clearly understood...

CodePudding user response:

To avoid blocking a thread, you have to ensure that the action doesn’t even run before the other. For example, you can use a CompletableFuture to chain an action, to be scheduled when the previous has been completed:

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for(int i = 0; i < 5; i  ) submit("one", task("one"), es);
    for(int i = 0; i < 5; i  ) submit("two", task("two"), es);
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(26));
    es.shutdown();
}

static Runnable task(String x) {
    return () -> {
        System.out.println(x);
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    };
}

static final ConcurrentHashMap<String, CompletableFuture<Void>> MAP
    = new ConcurrentHashMap<>();

static final void submit(String key, Runnable task, Executor e) {
    CompletableFuture<Void> job = MAP.compute(key,
        (k, previous) -> previous != null?
            previous.thenRunAsync(task, e): CompletableFuture.runAsync(task, e));
    job.whenComplete((v,t) -> MAP.remove(key, job));
}

The ConcurrentHashMap allows us to handle the cases as atomic updates

  • If no previous future exists for a key, just schedule the action, creating the future

  • If a previous future exists, chain the action, to be scheduled when the previous completed; the dependent action becomes the new future

  • If a job completed, the two-arg remove(key, job) will remove it if and only if it is still the current job

The example in the main method demonstrates how two independent actions can run with a thread pool of two threads, never blocking at thread.

  • Related