I am currently learning Multithreading in Java and there I learned about the producer-consumer problem. In which the producer is producing and the consumer is consuming from the queue irrespective of the number of producers or consumers on the shared buffered queue.
But while trying to solve this problem, there is a very strange issue that arises.
Code for Storage.java:
class Storage{
int[] buffer;
int index;
final Object lock = new Object();
public Storage(int n){
buffer = new int[n];
Arrays.fill(buffer, -1);
index = 0;
}
public void addValueInBuffer(int value){
synchronized (lock) {
if (index >= buffer.length) {
System.out.println("Thread " Thread.currentThread().getName()
" producer need to wait" " with index " index);
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
System.out.println("Thread " Thread.currentThread().getName()
" is adding value " value " with index " index);
buffer[index] = value;
index ;
lock.notifyAll();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void consumeValueFromBuffer(){
synchronized (lock){
if(index <= 0){
System.out.println("Thread " Thread.currentThread().getName()
" consumer need to wait" " with index " index);
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
System.out.println("Thread " Thread.currentThread().getName()
" is consuming value " " with index " index);
index--;
buffer[index] = -1;
lock.notifyAll();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public int totalConsumption(){
int count = 0;
for (int i = 0; i < buffer.length; i ) {
if(buffer[i]==0){
count ;
}
}
return count;
}
}
Code for Producer.java
class Producer{
Storage storage;
public Producer(Storage storage){
this.storage = storage;
}
public void addValue(int value){
this.storage.addValueInBuffer(value);
}}
Code for Consumer.java
class Consumer{
Storage storage;
public Consumer(Storage storage){
this.storage = storage;
}
private void consumeValue(){
this.storage.consumeValueFromBuffer();
}}
Code for ApplicationRunner.java
public class ApplicationRunner {
public static void main(String[] args) throws InterruptedException {
int n = 1;
Storage storage = new Storage(n);
Producer producer = new Producer(storage);
Consumer consumer = new Consumer(storage);
Thread producerThread = new Thread(()-> {
for (int i = 0; i < 5; i ) {
producer.addValue(0);
}
});
Thread producerThread2 = new Thread(()-> {
for (int i = 0; i < 10; i ) {
producer.addValue(1);
}
});
Thread consumerThread = new Thread(consumer);
Thread consumerThread2 = new Thread(consumer);
producerThread.start();
producerThread2.start();
consumerThread.start();
consumerThread2.start();
producerThread.join();
producerThread2.join();
consumerThread.join();
consumerThread2.join();
System.out.println("Storage is " storage.totalConsumption() " which should be zero!");
}}
SharedBuffered Queue is only having 1 storage so to simulate the special behavior. 2 producers are producing a total of 15 data and the 2 Consumers are consuming in an infinite loop.
Output from the above program is:
Thread Thread-0 is adding value 0 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-0 is adding value 0 with index 0
Thread Thread-3 is consuming value with index 0
Thread Thread-0 is adding value 0 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-0 is adding value 0 with index 0
Thread Thread-3 is consuming value with index 0
Thread Thread-0 is adding value 0 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-3 consumer need to wait with index 0
Thread Thread-2 consumer need to wait with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-1 producer need to wait with index 1
Thread Thread-2 is consuming value with index 0
Thread Thread-3 consumer need to wait with index 0
Thread Thread-2 consumer need to wait with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-3 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-3 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-2 is consuming value with index 0
Thread Thread-1 is adding value 1 with index 0
Thread Thread-3 is consuming value with index 0
Thread Thread-2 consumer need to wait with index 0
Thread Thread-3 consumer need to wait with index 0
I expect that Total Producer will be producing 15 items and consumer will be consuming 15 items. But this is not the case and what I observe is that the Producer is producing 8 items only and the consumer is consuming 8 items, which is I suppose because of the If-Else condition.
I don't understand this unusual behavior, Do I miss something while making code synchronization or Indexing of buffering?
CodePudding user response:
Not a full analysis but this bit at least looks wrong to me:
synchronized (lock) {
if (index >= buffer.length) {
System.out.println("Thread " Thread.currentThread().getName()
" producer need to wait" " with index " index);
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
The problem here is the else
. If the buffer is full, you wait, but then do nothing else. In other words because the buffer is full the else
block won't be executed, so the value to be added never is.
There's also a second error: if (index >= buffer.length)
Nope. If there are multiple producers, a thread adding values could be woken up and the buffer could still be full because another thread added a value before your thread did. Always test conditions like this in a loop: while( index >= buffer.length ) lock.wait();
so that you don't exit the loop until the condition you need to proceed is true.
Untested:
public void addValueInBuffer(int value) throws InterruptedException {
synchronized (lock) {
while(index >= buffer.length) lock.wait();
buffer[index] = value;
index ;
// you probably also need index = index % buffer.length
lock.notifyAll();
}
}