Home > Net >  Start execution if it is not in progress yet, otherwise wait for the existing execution to finish in
Start execution if it is not in progress yet, otherwise wait for the existing execution to finish in

Time:12-31

Imagine I have a class whose method (process) can be called concurrently. This method does some processing (doProcess) that cannot be executed concurrently. Moreover, if process method was called, but processing (doProcess) is already in progress - invocation of process should wait until the already started processing (doProcess) is finished and then return (without starting new processing - doProcess). Calling process method will only do actual processing (doProcess) if none of threads is doing actual processing (doProcess) yet.

// Just imaginary example to illustrate the idea
class MyResource() {
   val lock = MyLock()
   fun process() {
      if (!lock.isLocked) {
         lock.lock()
         doProcess()
         lock.unlock()
      }
      lock.awaitUnlocked()
   }
   private fun doProcess() {
      // actual processing
   }
}

What would be the intended concurrency (java.util.concurrent) primitive for the concurrency problem described in the imaginary example above? I thought about ReentrantLock but it does not offer awaitUnlocked. And the java.util.concurrent.locks.Condition seems too low-level for this simple use case.

CodePudding user response:

Sounds like what you are after could be accomplished with a ConcurrentMap - more specifically, computeIfAbsent.

For instance, you can run the code below. Works a bit like a cache - if you what you want is available, simply return it. Otherwise wait for which ever thread gets there first to compute it first.

import java.time.Instant;
import java.util.concurrent.*;

public class ConcurrencyTest
{
    private final ConcurrentMap<String, String> processItems = new ConcurrentHashMap<>();

    public String process()
    {
        return processItems.computeIfAbsent("doProcess", k -> doProcess());
    }

    private String doProcess()
    {
        try
        {
            System.out.println(Instant.now()   " -> doProcess() Starting some work on "   Thread.currentThread());
            Thread.sleep(100);
            System.out.println(Instant.now()   " -> doProcess() Finished some work on "   Thread.currentThread());
            return Instant.now().toString();
        }
        catch(Exception e)
        {
            throw new RuntimeException("Unexpected Exception : "   e, e);
        }
    }

    /**
     * And to test it out.
     *
     * @param args
     * @throws Exception
     */
    public static void main(String... args) throws Exception
    {
        final int THREADS = 10;
        final ConcurrencyTest test = new ConcurrencyTest();
        final ExecutorService execs = Executors.newFixedThreadPool(THREADS);
        final CountDownLatch startingGun = new CountDownLatch(1);

        for (int i = 0; i < THREADS; i  )
        {
            execs.submit(() -> {
                System.out.println(Instant.now()   " -> Thread -> "   Thread.currentThread()   " - Awaiting Starting Gun");
                try
                {
                    startingGun.await();
                }
                catch (InterruptedException e)
                {
                    throw new RuntimeException("Failure waiting for the starting gun.");
                }
                System.out.println(Instant.now()   " -> Running Thread -> "   Thread.currentThread());
                String val = test.process();
                System.out.println(Instant.now()   " -> Got back "   val   " -> "   Thread.currentThread());
            });
        }

        System.out.println("All tasks submitted.. waiting for 5 seconds then firing the starting gun. ");
        Thread.sleep(5_000);
        startingGun.countDown();
        execs.shutdown();
    }

}

Gets me the output below. As you can see only one thread end up executing the code in question. The rest wait on it.

All tasks submitted.. waiting for 5 seconds then firing the starting gun. 
2021-12-30T17:59:51.696626Z -> Thread -> Thread[pool-1-thread-3,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696606Z -> Thread -> Thread[pool-1-thread-8,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696245Z -> Thread -> Thread[pool-1-thread-10,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696231Z -> Thread -> Thread[pool-1-thread-6,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696627Z -> Thread -> Thread[pool-1-thread-5,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696633Z -> Thread -> Thread[pool-1-thread-4,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696274Z -> Thread -> Thread[pool-1-thread-2,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696231Z -> Thread -> Thread[pool-1-thread-1,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696620Z -> Thread -> Thread[pool-1-thread-7,5,main] - Awaiting Starting Gun
2021-12-30T17:59:51.696261Z -> Thread -> Thread[pool-1-thread-9,5,main] - Awaiting Starting Gun
2021-12-30T17:59:56.695927Z -> Running Thread -> Thread[pool-1-thread-5,5,main]
2021-12-30T17:59:56.696031Z -> Running Thread -> Thread[pool-1-thread-6,5,main]
2021-12-30T17:59:56.695968Z -> Running Thread -> Thread[pool-1-thread-4,5,main]
2021-12-30T17:59:56.695863Z -> Running Thread -> Thread[pool-1-thread-10,5,main]
2021-12-30T17:59:56.695832Z -> Running Thread -> Thread[pool-1-thread-8,5,main]
2021-12-30T17:59:56.696063Z -> Running Thread -> Thread[pool-1-thread-2,5,main]
2021-12-30T17:59:56.696133Z -> Running Thread -> Thread[pool-1-thread-3,5,main]
2021-12-30T17:59:56.696254Z -> Running Thread -> Thread[pool-1-thread-9,5,main]
2021-12-30T17:59:56.696230Z -> Running Thread -> Thread[pool-1-thread-7,5,main]
2021-12-30T17:59:56.696204Z -> Running Thread -> Thread[pool-1-thread-1,5,main]
2021-12-30T17:59:56.714154Z -> doProcess() Starting some work on Thread[pool-1-thread-4,5,main]
2021-12-30T17:59:56.814608Z -> doProcess() Finished some work on Thread[pool-1-thread-4,5,main]
2021-12-30T17:59:56.815375Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-5,5,main]
2021-12-30T17:59:56.815422Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-7,5,main]
2021-12-30T17:59:56.815104Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-8,5,main]
2021-12-30T17:59:56.815065Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-3,5,main]
2021-12-30T17:59:56.815093Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-2,5,main]
2021-12-30T17:59:56.815054Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-4,5,main]
2021-12-30T17:59:56.815420Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-6,5,main]
2021-12-30T17:59:56.815387Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-1,5,main]
2021-12-30T17:59:56.815077Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-9,5,main]
2021-12-30T17:59:56.815435Z -> Got back 2021-12-30T17:59:56.815016Z -> Thread[pool-1-thread-10,5,main]
  • Related