Home > Software engineering >  java concurrency: BlockingQueue based producer/consumer does not seem to work well with composite ac
java concurrency: BlockingQueue based producer/consumer does not seem to work well with composite ac

Time:11-07

It comes to a surprise to me when I am trying to implement some composite actions with a BlockingQueue based producer/consumer pattern, which makes me think I most likely have missed something obvious.

1. In short

I need

  • my consumer to make sequence actions in form of ‘take obj from the queue do more consumer operations on the obj’ atomic and
  • My producer to make sequence actions in form of ‘offer obj onto the queue do more producer operations on the obj’ atomic and
  • The two above atomic sequences synchronized on the same obj, obviously

Without such atomicity, problem may occur, see 'PROBLEM!!' as an example in the comment in code for the producer in the following section 2.

But I can’t simply put a synchronized block around the call to take() and its associated consumer operations as when the queue is empty, this consumer will be stuck there FOREVER since it will still possess the sync lock while it waits on the producer to fill the queue with an obj, and that sync lock possession of consumer will in turn stop the producer from entering corresponding critical region to do any 'producing'.

2. Specially, simplified example code are as the following:

Common code known to the producer and consumer classes:

Queue<QObj> nbq = new ConcurrentLinkedQueue();
BlockingQueue<QObj> bq = new LinkedBlockingQueue<>();
List<String> idList = new LinkedList<>();
Object lockObj = idList;
int Idx = 1;

public static class QObj {
    public String id;
    public String content;

    public QObj(String id, String content) {
        this.id = id;
        this.content = content;
    }
}

Main logic in producer class:

    public void produceBlocking() {
        QObj o = new QObj(String.valueOf(Idx), "Content_"   Idx  );
//        synchronized(lockObj) {
//        no point to include Queue.offer(...) call in a synchronized block as we
//        won't be able to use synchronized() in corresponding consumer anyway
//        for the reason described above
            bq.offer(o);
            synchronized (lockObj) {
                // PROBLEM!! by now, 'o' could have been 'consumed' already
                //  hence we shouldn't do the following operations:

                // do the associated part of composite action of 'producer'
                idList.add(o.id);
                // do some more operation as part of this composite action ...
            }
//        }
    }

Main logic in consumer class:

    public void consumeBlocking() {
        while (true) {
            try {
//                synchronized (lockObj) {
//                can't simply put synchronized() here to make the following composite action atomic
//                  - when the queue is empty, this consumer will be stuck here forever since it still possesses
//                      the lockObj, which stops the producer from entering the critical region to do any 'producing'
                    QObj o = bq.take();

                    synchronized (lockObj) {
                        // do the associated part of composite action of 'consumer'
                        idList.remove(o.id);
                        // do some more operation as part of this composite action ...
                    }
//                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

3. Why has this not been a common problem?

I feel that this must have been a common occurring problem when people are using BlockingQueue, and the fact that I couldn’t really locate anything addressing directly to a similar problem affirms my belief that I might have got something fundamentally wrong.

Can someone give some hint about a direct solution or point out where I thought wrong about this problem?

4. Alternative Ideas

I did think of a few ideas as alternatives, but I feel none of them is addressing this issue directly and all have some drawbacks (as highlighted 'DRAWBACK!!' in the comments in the code)

4.1 - Do a check using Queue.contains() before continue

public void produceBlockingWithCheck() {
    QObj o = new QObj(String.valueOf(Idx), "Content_"   Idx  );
    bq.offer(o);
    synchronized (lockObj) {
        // First, Check if the obj could have already been consumed

        // DRAWBACK!!: this could be very costly, e.g.
        //  when 'bq' is a LinkedBlockingQueue, and contains(...) always triggers
        //  a sequential traversal, the Queue itself can be very large

        if (bq.contains(o)) {
            // do the associated part of composite action of 'producer'
            idList.add(o.id);
            // do some more operation as part of this composite action ...
        }
    }
}

4.2 - Adjust the order of ops on the producer, move the Queue.offer() call to the end

public void produceBlockingOrderAdjusted() {
    QObj o = new QObj(String.valueOf(Idx), "Content_"   Idx  );
    // do the associated part of composite action of 'producer', only before
    // calling BlockingQueue.offer(...)

    //  DRAWBACK!!: even this may work for this simple case, such order adjustment
    //  won't not be logically possible for all cases, will it?

    synchronized (lockObj) {
        idList.add(o.id);
        // do some more operation as part of this composite action ...
    }
    bq.offer(o);
}

4.3 - Use non-blocking queues instead.


public void produceNonBlocking() {
    QObj o = new QObj(String.valueOf(Idx), "Content_"   Idx  );
    synchronized(lockObj) {
        nbq.offer(o);
        // do the associated part of composite action of 'producer'
        idList.add(o.id);
        // do some more operation as part of this composite action ...
    }
}

public void consumeNonBlocking() {
    while (true) {
        synchronized (lockObj) {
            // kind of doing our own blocking.
            QObj o = nbq.poll();
            if (o != null) {
                // do the associated part of composite action of 'consumer'
                idList.add(o.id);
                // do some more operation as part of this composite action ...
            }

            // DRAWBACK!!: if the 'producers' don't produce faster than the 'consumers' consuming,
            // this 'miss' could be happening too often and get costly
        }
    }
}

CodePudding user response:

Why has this not been a common problem?

Multi-threading is like the old board game, "Othello," which was marketed with the tag line, "A minute to learn, a lifetime to master." Modern threading libraries make it easy to get started writing multi-threaded code, but it's not easy to design algorithms that use multi-threading effectively. Sometimes, the same design principles that underly efficient, single-threaded algorithms can be completely inappropriate to use in multi-threaded code.

An experienced designer knows that when thread A puts some object in a queue to be "consumed" by thread B, it's best to let thread A be done with that object for good. Simply taking the object out of the queue should be enough for thread B to have exclusive use of it. If you can't do that without adding complexity to your design,... Well, that's the price you pay for using multiple threads.

A multi-threaded, parallel computation that's only half as efficient as a single-threaded implementation could be still could run four times as fast if it's running on an eight core machine.

CodePudding user response:

I need

  • my consumer to make sequence actions in form of ‘take obj from the queue do more consumer operations on the obj’ atomic and
  • My producer to make sequence actions in form of ‘offer obj onto the queue do more producer operations on the obj’ atomic and
  • The two above atomic sequences synchronized on the same obj, obviously

You can use wait notifyAll for that.
Try to read this article: it explains wait notifyAll in details.

But I can’t simply put a synchronized block around the call to take() and its associated consumer operations as when the queue is empty, this consumer will be stuck there FOREVER since it will still possess the sync lock while it waits on the producer to fill the queue with an obj, and that sync lock possession of consumer will in turn stop the producer from entering corresponding critical region to do any 'producing'.

wait notifyAll solves this problem because a thread that is waiting inside wait() releases the lock (and later when wait() need to return the thread acquires the lock again).

Also you can look at Condition javadocs.
Condition is the same concept as wait notify but for Lock interface (which is more flexible and powerful version of synchronized).
Again, look at the BoundedBuffer example in the javadocs - it seems like it could be modified to do what you want in your code.

  • Related