Home > Back-end >  Syncrhonization across two queues and threads
Syncrhonization across two queues and threads

Time:10-22

I have two concurrent threads (producer and consumer), and two queues (pending and execution).

This is a sample flow for producer:

"1" - If not duplicate (does not exist in any of queues), push task T1

"3" - If not duplicate (does not exist in any of queues), push task T1

And this is a sample flow for consumer:

"2" - Poll data from "pending" queue

"4" - If found something, push it into "execution" queue and run it in a separate thread.

Notice the numbering above:

  • If between steps 2 and 4, step 3 happens, it can insert a duplicate because the data is still in-memory and is not pushed into "execution" queue yet.

How can I prevent this? I can not put a lock on both queues because then the "consumer" thread will always keep the lock (it is an always running thread polling for data).

P.S.

This is how my consumer looks like:

while ( true ) {
  var nextTask = pending.poll(100, MILLISECOND); //STEP 2
  if ( nextTask != null ) {
    executeQueue.add(nextTask); //STEP 4
    executeInParallel(nextTask); 
  }
}

CodePudding user response:

This might not be a direct answer to your question. But if you are looking for a solution to queue tasks to process them with a number of threads you should have a look at the Executors from Java.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class JobQueue {
    public static void main(String[] args) {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

        executorService.submit(() -> {
            // do someting
            return "result";
        });
    }
}


Update: check queue:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class JobQueue {
    public static void main(String[] args) {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

        Runnable task = () -> {
            // do someting
        };

        BlockingQueue<Runnable> queue = executorService.getQueue();
        if(!queue.contains(task)) {
            executorService.submit(task);
        }
    }
}

CodePudding user response:

You can synchromize on an Object uning wait() and notify()

import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;

public class JobQueue {
    private static final Object syncMon = new Object();

    private static final Queue<Object> pending = new ArrayDeque<>();
    private static final Set<Object> executing = new HashSet<>();

    public void produce(Object o) {
        synchronized (syncMon) {
            if(pending.contains(o) || executing.contains(o))
                return;

            pending.add(o);
            syncMon.notifyAll();
        }
    }

    public Object consume() throws InterruptedException {
        synchronized (syncMon) {
            if(pending.isEmpty())
                syncMon.wait();

            Object task = pending.poll();
            if(task != null) {
                executing.add(task);
            }
            return task;
        }
    }

    public void complete(Object task) {
        synchronized (syncMon) {
            executing.remove(task);
        }
    }
}

The syncMon object is not necesarry. You could also use wait() and notify() on the Queue directly.

like pending.notifyAll();


To explain this a litte: If you invoke wait() in a synchronized block the lock is released. So the producer can enter the synchronized while the consumer is waiting. If you call notify() or notifyAll() the waiting thread wakes up and takes the lock back, once the producer has exited the synchronized block.


Update 1: add execution set.

  • Related