Home > Software engineering >  Semaphore in Java. producer-consumer problem printer
Semaphore in Java. producer-consumer problem printer

Time:12-14


package producerconsumer;
 
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public  class Buffer {
    int t;
    private final Semaphore notEmptyy  = new Semaphore(0); // prevent underflow
      private final Semaphore notFulll = new Semaphore(10); // prevent overflow
   private int itemn;
    private int itemb;
     int count=0;
     int buffersize =10 ;
    private final  BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(buffersize); 
    private final Semaphore mutex  = new Semaphore(1); // control buffer access
    private final Semaphore notEmpty  = new Semaphore(0); // prevent underflow
      private final Semaphore notFull = new Semaphore(10); // prevent overflow
    
          
    

     
    public    Buffer(){
          
    }
 public   void add( int x) throws InterruptedException{
 while(count== buffersize )  
     notFulll.acquire();
 System.out.println("user printer-request,: "  Thread.currentThread().getName()   "  "   x); 
  queue.offer(x);
     count  ;
  notEmptyy.release();


 }   
    public int take() throws InterruptedException{
        
        while(queue.isEmpty())  
     notEmptyy.acquire();
    t=queue.take();
     count--;
  notFulll.release();
  return t; 

    }
    
 
       public   void put( ){    
while(true){
       try  {
           Random random = new Random();
            int data = random.nextInt(100);
        notFull .acquire();
        mutex .acquire();
         add(data);
          mutex .release();
           notEmpty .release();
        //   itemb = data;
        } catch (InterruptedException e){
        System.out.println("InterruptedException caught ");
      }   
    //  System.out.println("user printer-request,: "  Thread.currentThread().getName()   "  "   itemb); 
    //   this.item = item; 
      
    }
}
       

    public void  get(){
     while(true){ 
      try{
          notEmpty .acquire();
          mutex .acquire();
            itemn = take();
           mutex .release();
           notFull .release();
           queue.remove(itemn);
            System.out.println("print request, : " Thread.currentThread().getName() " "   itemn );
      } catch (InterruptedException e){
        System.out.println("InterruptedException caught ");
      } 
     
     
    }   
   } 
}


package producerconsumer;


 
 
public class producers implements Runnable{
    private static final int DELAY = 500;
    private Buffer osLabGroup;
public producers(Buffer buffer){
        osLabGroup = buffer;
    }
 

    
    public void run(){
  //    while(true){
           
        osLabGroup.put();
        try{
            
            Thread.sleep(DELAY);
        }catch (InterruptedException exception) {}
    }
}
package producerconsumer;


public class consumers implements Runnable{
    private static final int DELAY = 1000;
    private Buffer osLabGroup;
public consumers(Buffer buffer){
        osLabGroup = buffer;
    }

    
    public void run(){
      //  while(true){
        osLabGroup.get();
        try{
           
            Thread.sleep(DELAY);
        }catch (InterruptedException exception) {}
    }
}
//}
package producerconsumer;

public class ProducerConsumer {

  
    public static void main(String[] args) {

        
   Buffer buffer = new Buffer();
      
            producers p1 = new producers(buffer);
            consumers c1 = new consumers(buffer);
             producers p2 = new producers(buffer);
            consumers c2 = new consumers(buffer);
             producers p3 = new producers(buffer);
            consumers c3 = new consumers(buffer);
            Thread pr1 = new Thread(p1);
            Thread co1 = new Thread(c1);
            Thread pr2 = new Thread(p2);
            Thread co2 = new Thread(c2);
            Thread pr3 = new Thread(p3);
            Thread co3 = new Thread(c3);
             pr1.setName("p1");
              co1.setName("c1");
               pr2.setName("p2");
                co2.setName("c2");
                 pr3.setName("p3");
                  co3.setName("c3");
                   
            pr1.start();
            co1.start(); 
            pr2.start();
            co2.start();
            pr3.start();
            co3.start();
        }
    }

I have a program that simulates the print jobs of a printer, one job at a time: either from Producer or Consumer and it shares an object called a Buffer.

in the buffer it has Set1 of threads, producers, put data with a ½ second delay.

also it does have a Set2 of threads, consumers, read data from the buffer with one second delay.

as it stands, I am trying to follow this guy https://www.youtube.com/watch?v=nxw2y27z0V4&t=1207s

using semaphore this is my code I problem my output is not accurate enter image description here

this my edited code

package producerconsumer;
 
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public  class Buffer {
    int t;
    private final Semaphore notEmptyy  = new Semaphore(0); // prevent underflow
      private final Semaphore notFulll = new Semaphore(10); // prevent overflow
   private int itemn;
    private int itemb;
     int count=0;
     int buffersize =10 ;
     private final  ArrayList<Integer> list = new ArrayList<Integer>(buffersize); 
    private final  LinkedList<Integer> queue = new LinkedList<Integer>(list); 
    private final Semaphore mutex  = new Semaphore(1); // control buffer access
    private final Semaphore notEmpty  = new Semaphore(0); // prevent underflow
      private final Semaphore notFull = new Semaphore(10); // prevent overflow
    
          
    

     
    public    Buffer(){
          
    }
 public   void add( int x) throws InterruptedException{
 while(count== buffersize )  
     notFulll.acquire();
 System.out.println("user printer-request,: "  Thread.currentThread().getName()   "  "   x); 
  queue.offer(x);
     count  ;
  notEmptyy.release();


 }   
    public   int take() throws InterruptedException{
        
        while(count ==0)  
     notEmptyy.acquire();
    t=queue.pollFirst();
     count--;
  notFulll.release();
  return t; 

    }
    
 
       public   void put( ){    
while(true){
       try  {
           Random random = new Random();
            int data = random.nextInt(100);
        notFull.acquire();
        mutex.acquire();
         add(data);
          mutex.release();
           notEmpty.release();
        //   itemb = data;
        } catch (InterruptedException e){
        System.out.println("InterruptedException caught ");
      }   
    //  System.out.println("user printer-request,: "  Thread.currentThread().getName()   "  "   itemb); 
    //   this.item = item; 
      
    }
}
       

    public   void  get(){
     while(true){ 
      try{
          notEmpty.acquire();
          mutex.acquire();
            itemn = take();
           mutex.release();
           notFull.release();
         // queue.remove(itemn);
            System.out.println("print request, : " Thread.currentThread().getName() " "   itemn );
      } catch (InterruptedException e){
        System.out.println("InterruptedException caught ");
      } 
     
     
    }   
   } 
}


i get this output enter image description here and this output enter image description here

CodePudding user response:

In Buffer#get, when you take an item from queue, you call queue#remove again. This is a wrong behavior. You should consume this item instead of operating the queue outside of the concurrency control code. This can lead to abnormal behaviors, such as deadlock.

try {
    notEmpty .acquire();
    mutex .acquire();
    itemn = take();
    mutex .release();
    notFull .release();
    // why remove from queue again? It is already taken from queue. Comment this line
    // queue.remove(itemn);
    System.out.println("print request, : " Thread.currentThread().getName() " "   itemn );
} catch (InterruptedException e){
    System.out.println("InterruptedException caught ");
}

In addition, in your program, you should replace ArrayBlockingQueue with a collection class under java.util package, such as LinkedList.

ArrayBlockingQueue you use is actually a thread-safe blocking queue. You can directly use take&put to complete this program without other concurrency control.

EDIT:

As mentioned in the video introduction, there are two ways to solve the producer-consumer problem, semaphore or monitor. Your code seems to mix these two together. There is no need to do any concurrency control in your add and take method.

Code should be:

public class consumers implements Runnable{
    private static final int DELAY = 1000;
    private Buffer osLabGroup;
    public consumers(Buffer buffer){
        osLabGroup = buffer;
    }

    public void run(){
        while (true) {
            osLabGroup.get();
            try{
                Thread.sleep(DELAY);
            }catch (InterruptedException exception) {}
        }
    }
}

public class producers implements Runnable{
    private static final int DELAY = 500;
    private Buffer osLabGroup;
    public producers(Buffer buffer){
        osLabGroup = buffer;
    }
    public void run(){
           while(true) {
               osLabGroup.put();
               try {
                   Thread.sleep(DELAY);
               } catch (InterruptedException exception) {
               }
           }
    }
}

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Semaphore;

public  class Buffer {
    int buffersize =10 ;
    private final  LinkedList<Integer> queue = new LinkedList<Integer>();
    private final Semaphore mutex  = new Semaphore(1); // control buffer access
    private final Semaphore notEmpty  = new Semaphore(0); // prevent underflow
    private final Semaphore notFull = new Semaphore(buffersize); // prevent overflow

    public Buffer(){
    }
    public void add( int x) throws InterruptedException{
        System.out.println("user printer-request,: "  Thread.currentThread().getName()   "  "   x   ","   System.currentTimeMillis());
        queue.add(x);
    }
    public int take() throws InterruptedException{
        Integer first = queue.removeFirst();
        System.out.println("print request, : " Thread.currentThread().getName() " "   first   ","   System.currentTimeMillis() );
        return first;
    }

    public  void put(){
        try  {
            Random random = new Random();
            int data = random.nextInt(100);
            notFull.acquire();
            mutex.acquire();
            add(data);
            mutex.release();
            notEmpty.release();
        } catch (InterruptedException e){
            System.out.println("InterruptedException caught ");
        }
    }

    public void  get(){
        try{
            notEmpty.acquire();
            mutex.acquire();
            int take = take();
            mutex.release();
            notFull.release();
        } catch (InterruptedException e){
            System.out.println("InterruptedException caught ");
        }
    }
}
  • Related