Home > OS >  Is this the correct way of implementing the Consumer Producer problem with multiple Producers?
Is this the correct way of implementing the Consumer Producer problem with multiple Producers?

Time:10-17

I'm new to multi-threading programming and I was wondering if there are some best practices when trying to implement the Consumer Producer problem with multiple Producers.

This is my current implementation and it seems to work fine, my main doubt is regarding the use of mtx.lock() (for example if I should use a lock_guard instead, and in case what are the advantages for that).

Moreover, is it ok to put g_mtx.unlock() right before g_cv.notify_one();? I don't really understand who acquires the mutex and when; my understanding is that the consumer acquires g_mtx right before the g_cv.wait and then releases it when the conditional_variable condition is checked (and is false), then when it gets notified it recheck the variable and if it is true it reacquires the mutex. Is it right?


#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

std::atomic<int> g_n = 0;
static std::condition_variable g_cv;
static std::mutex g_mtx;
static bool g_notified=false;


void consumerFunction(){
    while(g_n==0);
    while(g_n>0){
        std::unique_lock<std::mutex> g_lock(g_mtx);
        std::cout << "[CONSUMER] Waiting for notification..." << std::endl;
        g_cv.wait(g_lock, []{
            std::cout << "[CONSUMER] Notification received!" << std::endl;
            return g_notified;
        });
        g_n-=1;
        std::cout << "[CONSUMER] One less Producer!" << std::endl;
        g_notified = false;
    }
    std::cout << "[CONSUMER] CONSUMER FUNCTION HAS COMPLETED ITS WORK" << std::endl;
}

void producerFunction(){
    uint32_t count;
    for(uint32_t i = 0; i < 10; i  ){
        // Count until 10...
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    g_mtx.lock();
    g_notified=true;
    g_mtx.unlock();
    std::cout << "[PRODUCER] Notifying..." << std::endl;
    g_cv.notify_one();


}

int main(){
    std::thread consumer([]{consumerFunction();});

    g_n=4;

    std::vector<std::thread> threads;
    for(int i = 0; i <g_n; i  ){
        std::this_thread::sleep_for(std::chrono::seconds(5));
        threads.emplace_back([]{producerFunction();});
    }


    consumer.join();
    for(auto& t: threads){
        if(t.joinable()){
            t.join();
        }
    }
    return 0;
}

Note: If you try to compile this code in your environment you should build it in Debug mode to avoid any optimization (not sure if that's the case but in happened to me before when defining dummy producer functions).

Thanks

CodePudding user response:

Overall, this looks fine to me.

The spin wait while(g_n==0); is a bit expensive but I guess this is pat of the example and this part will not be used in production. Otherwise, it is better to replace it with a passive waiting approach (typically another wait condition or a semaphore).

for example if I should use a lock_guard instead, and in case what are the advantages for that

It can be a bit shorter and possibly a bit safer to use it but it does not have a significant impact here on this specific code. Guards are good to ensure the lock is release even in complex/unexpected cases like when an exception occurs (so not much here). Whether you should use it is matter of opinion.

is it ok to put g_mtx.unlock() right before g_cv.notify_one();

With the current code, g_cv.wait can spuriously wake up and check for the condition just after g_mtx.unlock() is called and before the g_cv.notify_one(). If so, the checking lambda is called and it will return true, the print can be made visible, the lock can then be released and taken again before calling g_cv.wait which will release the lock just before entering in sleeping mode. All of that before g_cv.notify_one() is called! That being said, the call to g_cv.notify_one() will not be able to fully wake up the new call to g_cv.wait. Indeed, the notification will wake up the wait like a spurious wake up but the g_notified value will be false so the waiting function will not be completed. Thus, this is fine.

If you put the notification before the unlock, then the waiting function cannot be completely awaken because the lock needs to be acquired before the check lambda is called. On some platforms, it can cause the waiting thread to wake up due to the notification and then immediately wait for the mutex to be acquired resulting in unnecessary context switches. To quote the C documentation:

The notifying thread does not need to hold the lock on the same mutex as the one held by the waiting thread(s); in fact doing so is a pessimization, since the notified thread would immediately block again, waiting for the notifying thread to release the lock. However, some implementations (in particular many implementations of pthreads) recognize this situation and avoid this "hurry up and wait" scenario by transferring the waiting thread from the condition variable's queue directly to the queue of the mutex within the notify call, without waking it up.

my understanding is that the consumer acquires g_mtx right before the g_cv.wait and then releases it when the conditional_variable condition is checked (and is false), then when it gets notified it recheck the variable and if it is true it reacquires the mutex. Is it right?

Yes. g_cv.wait release atomically g_mtx and put the thread in sleep. However, AFAIK, the mutex is acquired before the checking lambda is called. This is important since g_notified is protected by the lock and must be accessed in a locked section. More specifically, the mutex ensure read to g_notified are correct due to the implicit memory barrier generated by acquiring/releasing the mutex. The mutex is released if the checking lambda return false. If so, the thread is put in sleep. In the end, all accesses to g_notified are protected in the code (except the initialization that do not need to be protected).

  • Related