Home > Mobile >  C 11 Simple Producer Consumer Multithreading
C 11 Simple Producer Consumer Multithreading

Time:10-06

I am trying to teach myself multithreading and I followed this tutorial here: https://www.classes.cs.uchicago.edu/archive/2013/spring/12300-1/labs/lab6/

If you scroll all the way to the bottom there is a sample snippet of a producer-consumer and it asks us to solve the race conditions found in this code:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

int main() {
    int c = 0;
    bool done = false;
    queue<int> goods;

    thread producer([&]() {
        for (int i = 0; i < 500;   i) {
            goods.push(i);
            c  ;
        }

        done = true;
    });

    thread consumer([&]() {
        while (!done) {
            while (!goods.empty()) {
                goods.pop();
                c--;
            }
        }
    });

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;
}

The Net value at the end should be 0, here is my attempt at it:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
#include <atomic>
using namespace std;


int main() {

    int c = 0;

    bool done = false;
    queue<int> goods;
    mutex mtx;  
    condition_variable cond_var;

    // thread to produce 500 elements
    thread producer([&]() {

        for (int i = 0; i < 500;   i) {
            // lock critical secion
            unique_lock<mutex> lock(mtx);   

            goods.push(i);
            c  ;
            lock.unlock();

            // notify consumer that data has been produced
            cond_var.notify_one();
        }

        // notify the consumer that it is done
        done = true;
        cond_var.notify_one();


    });

    // thread to consume all elements
    thread consumer([&]() {

        while (!done) {
            unique_lock<mutex> lock(mtx);   
            while (!goods.empty()) {
                goods.pop();
                c--;
            }
            // unlocks lock and wait until something in producer gets put
            cond_var.wait(lock);
        }
    });

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;
}

I feel like I am fundamentally missing something. I believe the biggest problem I am having is in the consumer with the cond_var.wait() because if the producer sets "done" to true then the consumer won't go back into the while(!goods.empty()). I am not sure how to fix it though.

Any hints, explanations or even different approaches would be appreciated!

CodePudding user response:

Producer:

thread producer([&]() {

    for (int i = 0; i < 500;   i)
    {
        {
            // Just have a lock while interacting with shared items.
            unique_lock<mutex> lock(mtx);   
            goods.push(i);
            c  ;
        }
        cond_var.notify_one();
    }

    // Lock to update shared state.
    unique_lock<mutex> lock(mtx);   
    done = true;
    cond_var.notify_one();
});

Consumer

thread consumer([&]() {

    // This loop exits when
    //        done          => true
    //   AND  goods.empty() => true

    // Acquire lock before checking shared state.
    unique_lock<mutex> lock(mtx);

    while (!(done && goods.empty()))
    {
        // Wait until there is something in the queue to processes
        // releasing lock while we wait.
        // Break out if we are done or goods is not empty.
        cond_var.wait(lock, [&](){return done || !goods.empty();});

        // You now have the lock again, so modify shared state is allowed
        // But there is a possibility of no goods being available.
        // So let's check before doing work.
        if (!goods.empty())
        {
            goods.pop();
            c--;
        }
    }
});

Alternatively if we are simply solving for race condition. We can simply check on the state of done and make sure no other variables have interactions.

Producer:

thread producer([&]() {

    // The consumer is not allowed to touch goods
    // until you are finished. So just use with
    // no locks.
    for (int i = 0; i < 500;   i)
    {
        goods.push(i);
        c  ;
    }

    // Lock to update shared state.
    // Tell consumer we are ready for processing.
    unique_lock<mutex> lock(mtx);   
    done = true;
    cond_var.notify_one();
});

Consumer

thread consumer([&]() {

    // Acquire lock before checking shared state.
    unique_lock<mutex> lock(mtx);
    cond_var.wait(lock, [&](){return done;});

    // We now know the consumer has finished all updates.
    // So we can simply loop over the goods and processes them
    while (!goods.empty())
    {
        goods.pop();
        c--;
    }
});
  • Related