Home > Enterprise >  Why is one thread getting activated in my Producer Consumer Code
Why is one thread getting activated in my Producer Consumer Code

Time:09-04

I am learning locks and conditions in Java and have implemented producer and consumer code.Idea is to have 1 producer and N Consumer . But when i run the code , always 1 consumer thread read

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProdConsumer {
    int MAX_ELEMENTS = 10;
    Queue<Integer> q ;
    Lock lock =  new ReentrantLock(false);
    Condition read  =  lock.newCondition();
    Condition write = lock.newCondition();

    ProdConsumer(){
        q = new LinkedList<>();
    }

    public static void main(String[] args) throws InterruptedException {
        ProdConsumer pq  = new ProdConsumer();
        new Thread(() -> {
            try {
                new Producer(pq.q).produce(pq.lock,pq.read,pq.write,pq.MAX_ELEMENTS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                System.out.println("Started "   Thread.currentThread().getName());
                new Consumer(pq.q).consume(pq.lock,pq.read,pq.write);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                System.out.println("Started "   Thread.currentThread().getName());
                new Consumer(pq.q).consume(pq.lock,pq.read,pq.write);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

class Producer {
    Queue q;

    Producer(Queue q) {
        this.q = q;
    }

    void produce(Lock lock, Condition read, Condition write, int MAX_ELEMENTS) throws InterruptedException {
        lock.lock();
        try {
            int count = 0;
            while (count < 1000) {
                while (q.size() == MAX_ELEMENTS) {
                    write.await();
                }
                q.add(count);
                System.out.println(Thread.currentThread().getName()   "PRODUCED "   count);
                count  ;
                read.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

class Consumer {
    Queue q;
    Consumer(Queue q ){
        this.q = q;
    }
    void consume(Lock lock, Condition read, Condition write) throws InterruptedException {
        lock.lock();
        try {
            while (true) {
                while (q.size() == 0) {
                    read.await();
                }
                System.out.println(Thread.currentThread().getName()   "CONSUMED "   q.poll());
                Thread.sleep((long)(Math.random() * 1000));
                write.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

Result:

Started Thread-1
Started Thread-2
Thread-0PRODUCED 0
Thread-0PRODUCED 1
Thread-0PRODUCED 2
Thread-0PRODUCED 3
Thread-0PRODUCED 4
Thread-0PRODUCED 5
Thread-0PRODUCED 6
Thread-0PRODUCED 7
Thread-0PRODUCED 8
Thread-0PRODUCED 9
Thread-1CONSUMED 0
Thread-1CONSUMED 1
Thread-1CONSUMED 2
Thread-1CONSUMED 3
Thread-1CONSUMED 4
Thread-1CONSUMED 5
Thread-1CONSUMED 6
Thread-1CONSUMED 7
Thread-1CONSUMED 8
Thread-1CONSUMED 9

Here we can see , even tough both thread starts , always Thread one is consuming. My thought process is , when producer signals all , then either of consumer thread should start. What am i doing wrong ?

CodePudding user response:

Because waiting threads are signalled in FIFO order. ReentrantLock.html#newCondition

So any consumer thread acquires the lock first, it will enter the wait state first, then the thread will be awakened first, and then continue to consume.

You can try to put the consumer's lock.lock();/lock.unlock() into the while loop, in this case, since the lock is unfair, it may be consumed by both consumers.

CodePudding user response:

Unfortunately you have implemented the producer-consumer with a dependency on both the queue size and locks, and the queue access is not thread-safe. The implementation is unlikely to work properly as producer could call signalAll several times when neither consumer is using read.

Because your producer starts first it could either add the queue so that both consumers see q.size() > 0 when they start-up or they might both see q.size() == 0 and miss the producer saying read.signalAll() before they enter read.await(). Thus this while loop below is not doing as you expect in both consumers:

while (q.size() == 0) {
    read.await();
}

To demonstrate the above, switch the new Threads around and start up both Consumer first BEFORE the Producer, and change the consumer so they rely only on only the read locks not queue size to determine when to read the queue.

//  while (q.size() == 0) {
    read.await();
//  }

That avoids not-threadsafe access to q.size() and q.poll() but still may fail if both consumers do not hit read.await() so you might need to sleep before starting the producer.

A better (and reliable) solution would replace locks with thread-safe Queue such as new ArrayBlockingQueue<>(MAX_ELEMENTS) and use q.put() / q.take() to ensure consumers are exactly handling everything from the producer, and then you won't need to worry about the order of producer-consumer startup (nor add a sleep before producer start-up).

Also note that System.out in these threads introduces some inter-thread synchronisation (see PrintWriter source code) so the results may not be same without your logging.

  • Related