Home > Mobile >  Semaphores not avoiding thread loss
Semaphores not avoiding thread loss

Time:11-28

this is my first question here so please bear with me.

I am currently working on a UNI assignment on multithreading and concurrency in Java where we are asked to implement various versions of a "Call Center" using different thread locking methods, with one of them being Semaphores. I'll get right into the code to show what my problem is:

Producer Class:

public final class Caller implements Runnable {
    private final CallCenter callCenter;

    public Caller(long id, CallCenter callCenter) {
        this.callCenter = callCenter;
    }

    @Override
    public void run() {
        try {
            callCenter.receive(new Call());
        } catch(Exception ex) {
            throw new RuntimeException(ex);
        }
    }
}

Consumer Class:

public final class Operator implements Runnable {
    private final CallCenter callCenter;
    private Call call;

    public Operator(CallCenter callCenter) {
        this.callCenter = callCenter;
    }

    @Override
    public void run() {
        try {
            this.call = callCenter.answer();
        } catch(InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    public Call getCall() {
        return this.call;
    }   
}

Service:

import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.LinkedList;

public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
    private final Queue<Call> pendingCalls = new LinkedList<Call>();
    private Semaphore semaphore = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);

    public void receive(Call call) throws Exception {
        semaphore.acquire();
        pendingCalls.add(call);
    }

    public Call answer() throws InterruptedException {
        semaphore.release();
        return pendingCalls.poll();
    }
}

Call Implementation:

import java.util.concurrent.atomic.AtomicLong;

public final class Call {
    private static final AtomicLong currentId = new AtomicLong();
    private final long id = currentId.getAndIncrement();

    public long getId() {
        return id;
    }
}

Disclaimer

I know I am probably not using the semaphore the way it is intended to be used, but reading the official docs an other blogs/answers does not help at all. We have the following constraints: only modify the Service Class, solve using Semaphores and only use Semaphore.acquire() and Semaphore.receive() to avoid racing and busy waiting, no other method or thread-locking structure is allowed

Actual Problem:

I'll avoid posting here the entirety of the tests written by our professor, just know that 100 calls are sent to the Service, for simplicity each caller only calls once and each operator only responds once. When implementing the callcenter without semaphores you'll get busy waits generated by a while loop and concurrency is not well-managed as some calls can be answered twice or more if the different threads act simultaneously. The mission here is to eliminate busy waits and ensure each call is received and answered only once. I tried using semaphores as reported above, and while busy wait is eliminated some of the calls end up not being answered at all. Any advice on what I am doing wrong? How do I ensure that each and every call is answered only once?

CodePudding user response:

In the end, I did it using three semaphores. The first semaphore new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true) guards the queue in the sense of blocking new entries when pendingCalls.size() >= MAX_NUMBER_OF_PENDING_CALLS . The second semaphore new Semaphore(1, true) guards the producer threads, allowing just one thread at a time to access the queue for adding operations. The third and last semaphore starts with no permits and waits for the first producer thread to insert the first call into the buffer new Semaphore(0, true) .

Code

public final class BoundedCallCenterSemaphore implements BoundedCallCenter {
    private final LinkedList<Call> pendingCalls = new LinkedList<Call>();
    static Semaphore receiver = new Semaphore(1, true);
    static Semaphore storage = new Semaphore(MAX_NUMBER_OF_PENDING_CALLS, true);
    static Semaphore operants = new Semaphore(0, true);

    public void receive(Call call) throws Exception {
        try {
            storage.acquire();
        }
        catch (InterruptedException e)
        {

        }
        try {
            receiver.acquire();
        }
        catch (InterruptedException e)
        {

        }
        synchronized (pendingCalls) {
            pendingCalls.add(call);
            operants.release();
        }
    }

    public Call answer() throws InterruptedException {
        try
        {
            operants.acquire();
        }
        catch (InterruptedException e)
        {

        }
        Call call = null;
        synchronized (pendingCalls) {
            call = pendingCalls.poll();
            storage.release();
            receiver.release();
        }
        return call;
    }
}
  • Related