Home > front end >  One producer, multiple consumers. How to use condition variables with unbounded buffer?
One producer, multiple consumers. How to use condition variables with unbounded buffer?

Time:12-03

Despite the number of similar questions on Stackoverflow, I can't come up with a solution for the following Producer-Consumer problem:

My program has three threads:

  • One writer thread that reads from a file, saves data into a sensor_data_t struct, and writes it into a dynamic pointer-based buffer using sbuffer_insert(buffer, &sensor_data). Once this thread finishes reading it sends an end-of-stream data struct represented by data->id == 0.

  • Two reader threads that remove data from the buffer head (FIFO-style), and store it into a temporary data struct using sbuffer_remove(buffer, &data) and then print it to the cmd line for testing purposes.

I think I have to avoid at the least:

  1. My reader threads to try to consume/remove from the buffer while it is empty.
  2. My reader threads to consume/remove from the buffer at the same time.

On the other hand, I don't think my writer thread in sbuffer_insert() needs to worry if the readers are changing the head because it only appends to the tail. Is this reasoning correct or am I missing something?

Here's what I've done so far:

My main function:

sbuffer_t *buffer;

void *writer(void *fp);
void *reader(void *fp);

int main()
{
    // Initialize the buffer
    sbuffer_init(&buffer);

    // Open sensor_data file
    FILE *sensor_data_fp;
    sensor_data_fp = fopen("sensor_data", "rb");

    // Start thread for reading sensor_data file adding elements to the sbuffer
    pthread_t writer_thread;
    pthread_create(&writer_thread, NULL, &writer, sensor_data_fp);

    // Open sensor_data_out file
    FILE *sensor_data_out_fp;
    sensor_data_out_fp = fopen("sensor_data_out", "w");

    // Start thread 1 and 2 for writing sensor_data_out file
    pthread_t reader_thread1;
    pthread_t reader_thread2;
    pthread_create(&reader_thread1, NULL, &reader, sensor_data_out_fp);
    pthread_create(&reader_thread2, NULL, &reader, sensor_data_out_fp);

    // Wait for threads to finish and join them

    pthread_join(reader_thread1, NULL);
    pthread_join(reader_thread2, NULL);
    pthread_join(writer_thread, NULL);


    // Close sensor_data file
    fclose(sensor_data_fp);
    // Close sensor_data_out file
    fclose(sensor_data_out_fp);

    // free buffer
    sbuffer_free(&buffer);

    return 0;
}

My reader and writer threads:

typedef uint16_t sensor_id_t;
typedef double sensor_value_t;
typedef time_t sensor_ts_t;         // UTC timestamp as returned by time() - notice that the size of time_t is different on 32/64 bit machine

typedef struct {
    sensor_id_t id;
    sensor_value_t value;
    sensor_ts_t ts;
} sensor_data_t;



void *writer(void *fp)
{
    // cast fp to FILE
    FILE *sensor_data_fp = (FILE *)fp;

    // make char buffers of size sensor_id_t, sensor_value_t and sensor_ts_t
    char sensor_id_buffer[sizeof(sensor_id_t)];
    char sensor_value_buffer[sizeof(sensor_value_t)];
    char sensor_ts_buffer[sizeof(sensor_ts_t)];

    // parse sensor_data file into sensor_id_buffer, sensor_value_buffer and sensor_ts_buffer
    while(fread(sensor_id_buffer, sizeof(sensor_id_t), 1, sensor_data_fp) == 1 &&
    fread(sensor_value_buffer, sizeof(sensor_value_t), 1, sensor_data_fp) == 1 &&
    fread(sensor_ts_buffer, sizeof(sensor_ts_t), 1, sensor_data_fp)) {
        // create sensor_data_t
        sensor_data_t sensor_data;
        // copy sensor_id_buffer to sensor_data.id
        memcpy(&sensor_data.id, sensor_id_buffer, sizeof(sensor_id_t));
        // copy sensor_value_buffer to sensor_data.value
        memcpy(&sensor_data.value, sensor_value_buffer, sizeof(sensor_value_t));
        // copy sensor_ts_buffer to sensor_data.ts
        memcpy(&sensor_data.ts, sensor_ts_buffer, sizeof(sensor_ts_t));

        // print sensor_data for testing
        // printf("sensor_data.id: %d, sensor_data.value: %f, sensor_data.ts: %ld\n", sensor_data.id, sensor_data.value, sensor_data.ts);

        // insert sensor_data into buffer
        sbuffer_insert(buffer, &sensor_data);
    }
    // Add dummy data to buffer to signal end of file
    sensor_data_t sensor_data;
    sensor_data.id = 0;
    sensor_data.value = 0;
    sensor_data.ts = 0;
    sbuffer_insert(buffer, &sensor_data);

    return NULL;
}

void *reader(void *fp)
{
    // cast fp to FILE
    //FILE *sensor_data_out_fp = (FILE *)fp;

    // Init data as sensor_data_t
    sensor_data_t data;
    do{
        // read data from buffer
        if (sbuffer_remove(buffer, &data) == 0) { // SBUFFER_SUCCESS 0
            // write data to sensor_data_out file
            // fwrite(&data, sizeof(sensor_data_t), 1, sensor_data_out_fp);
            // print data for testing
            printf("data.id: %d, data.value: %f, data.ts: %ld \n", data.id, data.value, data.ts);
        }
    }
    while(data.id != 0);

    // free allocated memory
    // free(fp);

    return NULL;
}

Global variables and buffer initialization:

typedef struct sbuffer_node {
    struct sbuffer_node *next;  
    sensor_data_t data;       
} sbuffer_node_t;


struct sbuffer {
    sbuffer_node_t *head;     
    sbuffer_node_t *tail;       
};

pthread_mutex_t mutex;
pthread_cond_t empty, removing;
int count = 0; // reader count


int sbuffer_init(sbuffer_t **buffer) {
    *buffer = malloc(sizeof(sbuffer_t));
    if (*buffer == NULL) return SBUFFER_FAILURE;
    (*buffer)->head = NULL;
    (*buffer)->tail = NULL;

    // Initialize mutex and condition variables
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&empty, NULL);
    pthread_cond_init(&removing, NULL);

    return SBUFFER_SUCCESS;
}

sbuffer_remove (Consumer)

int sbuffer_remove(sbuffer_t *buffer, sensor_data_t *data) {
    sbuffer_node_t *dummy;
    if (buffer == NULL) return SBUFFER_FAILURE;

    // while the count is 0, wait
    pthread_mutex_lock(&mutex);
    while (count > 0) {
        pthread_cond_wait(&removing, &mutex);
    }
    pthread_mutex_unlock(&mutex);

    pthread_mutex_lock(&mutex);
    while (buffer->head == NULL){

        pthread_cond_wait(&empty, &mutex); // Wait until buffer is not empty

        if (data->id == 0){ // end-of-stream
            pthread_mutex_unlock(&mutex);
            return SBUFFER_NO_DATA;
        }
    }

    count  ;

    *data = buffer->head->data;
    dummy = buffer->head;
    if (buffer->head == buffer->tail) // buffer has only one node
    {
        buffer->head = buffer->tail = NULL;
    } else  // buffer has many nodes empty
    {
        buffer->head = buffer->head->next;
    }
    free(dummy);

    count--;

    pthread_cond_signal(&removing); // Signal that data is removed
    pthread_mutex_unlock(&mutex);

    return SBUFFER_SUCCESS;
}

sbuffer_insert (Producer)

int sbuffer_insert(sbuffer_t *buffer, sensor_data_t *data) {
    sbuffer_node_t *dummy;
    if (buffer == NULL) return SBUFFER_FAILURE;
    dummy = malloc(sizeof(sbuffer_node_t));
    if (dummy == NULL) return SBUFFER_FAILURE;
    dummy->data = *data;
    dummy->next = NULL;


    if (buffer->tail == NULL) // buffer empty (buffer->head should also be NULL
    {
        pthread_mutex_lock(&mutex);
        buffer->head = buffer->tail = dummy;
        pthread_cond_signal(&empty); // Signal that buffer is not empty
        pthread_mutex_unlock(&mutex);

    } else // buffer not empty
    {
        buffer->tail->next = dummy;
        buffer->tail = buffer->tail->next;
    }
    return SBUFFER_SUCCESS;
}

Currently, the code has very unstable behavior. Sometimes it runs and prints everything, sometimes it doesn't print anything and gets stuck in a loop, sometimes it prints everything but the last value comes after the end-of-stream code and it doesn't terminate.

I would really appreciate a solution that explains what I'm doing wrong or a comment that redirects me to a duplicate of my question.

CodePudding user response:

I think I have to avoid at the least:

  1. My reader threads to try to consume/remove from the buffer while it is empty.
  2. My reader threads to consume/remove from the buffer at the same time.

Yes, you must avoid those. And more.

On the other hand, I don't think my writer thread in sbuffer_insert() needs to worry if the readers are changing the head because it only appends to the tail. Is this reasoning correct or am I missing something?

You are missing at least that

  1. when the buffer contains fewer than two nodes, there is no distinction between the head node and the tail node. This manifests at the code level at least in the fact that your sbuffer_insert() and sbuffer_remove() functions both access both buffer->head and buffer->tail. From the perspective of synchronization requirements, it is this lower-level view that matters.

  2. Insertion and removal modifies the node objects themselves, not just the overall buffer object.

  3. Synchronization is not just, nor even primarily, about avoiding threads directly interfering with each other. It is even more about the consistency of different threads' views of memory. You need appropriate synchronization to ensure that one thread's writes to memory are (ever) observed by other threads, and to establish ordering relationships among operations on memory by different threads.

Currently, the code has very unstable behavior. Sometimes it runs and prints everything, sometimes it doesn't print anything and gets stuck in a loop, sometimes it prints everything but the last value comes after the end-of-stream code and it doesn't terminate.

This is unsurprising, because your program contains data races, and its behavior is therefore undefined.

Do ensure that neither the reader nor the writer accesses any member of the buffer object without holding the mutex locked. As the code is presently structured, that will synchronize access not only to the buffer structure itself, but also to the data in the nodes, which presently are involved in their own data races.


Now note that here ...

    while (buffer->head == NULL){

        pthread_cond_wait(&empty, &mutex); // Wait until buffer is not empty

        if (data->id == 0){ // end-of-stream
            pthread_mutex_unlock(&mutex);
            return SBUFFER_NO_DATA;
        }
    }

... you are testing for an end-of-data marker before actually reading an item from the buffer. It looks like that's useless in practice. In prarticular, it does not prevent the end-of-stream item from being removed from the buffer, so only one reader will see it. The other(s) will then end up waiting indefinitely for data that will never arrive.


Next, consider this code executed by the readers:

    // while the count is 0, wait
    pthread_mutex_lock(&mutex);
    while (count > 0) {
        pthread_cond_wait(&removing, &mutex);
    }
    pthread_mutex_unlock(&mutex);

Note well that the reader unlocks the mutex while count is 0, so there is an opportunity for another reader to reach the while loop and pass through. I'm not sure that two threads both getting past that point at the same time produces a problem in practice, but the point seems to be to avoid that, so do it right: move the count from later in the function to right after the while loop, prior to unlocking the mutex.

Alternatively, once you've done that, it should be clear(er) that you've effectively hand-rolled a binary semaphore. You could simplify your code by switching to an actual POSIX semaphore for this purpose. Or if you want to continue with a mutex CV for this, then consider using a different mutex, as the data to be protected for this purpose are disjoint from the buffer and its contents. That would get rid of the weirdness of re-locking the mutex immediately after unlocking it.

Or on the third hand, consider whether you need to do any of that at all. How is the (separate) mutex protection of the rest of the body of sbuffer_remove() not sufficient by itself? I propose to you that it is sufficient. After all, you're presently using your hand-rolled semaphore exactly as (another) mutex.


The bones of this code seem reasonably good, so I don't think repairs will be too hard.

First, add the needed mutex protection in sbuffer_insert(). Or really, just expand the scope of the critical section that's already there:

int sbuffer_insert(sbuffer_t *buffer, sensor_data_t *data) {
    sbuffer_node_t *dummy;
    if (buffer == NULL) return SBUFFER_FAILURE;
    dummy = malloc(sizeof(sbuffer_node_t));
    if (dummy == NULL) return SBUFFER_FAILURE;
    dummy->data = *data;
    dummy->next = NULL;

    pthread_mutex_lock(&mutex);
    if (buffer->tail == NULL) // buffer empty (buffer->head should also be NULL
    {
        assert(buffer->head == NULL);
        buffer->head = buffer->tail = dummy;
        pthread_cond_signal(&empty); // Signal that buffer is not empty

    } else // buffer not empty
    {
        buffer->tail->next = dummy;
        buffer->tail = buffer->tail->next;
    }
    pthread_mutex_unlock(&mutex);

    return SBUFFER_SUCCESS;
}

Second, simplify and fix sbuffer_remove():

int sbuffer_remove(sbuffer_t *buffer, sensor_data_t *data) {
    if (buffer == NULL) {
        return SBUFFER_FAILURE;
    }

    pthread_mutex_lock(&mutex);

    // Wait until the buffer is nonempty
    while (buffer->head == NULL) {
        pthread_cond_wait(&empty, &mutex);
    }

    // Copy the first item from the buffer
    *data = buffer->head->data;

    if (data->id == 0) {
        // end-of-stream: leave the item in the buffer for other readers to see
        pthread_mutex_unlock(&mutex);
        return SBUFFER_NO_DATA;
    } // else remove the item

    sbuffer_node_t *dummy = buffer->head;

    buffer->head = buffer->head->next;
    if (buffer->head == NULL) {
        // buffer is now empty
        buffer->tail = NULL;
    }

    pthread_mutex_unlock(&mutex);

    free(dummy);

    return SBUFFER_SUCCESS;
}

CodePudding user response:

Not a complete answer here, but I see this in your sbuffer_remove function:

    // while the count is 0, wait
    pthread_mutex_lock(&mutex);
    while (count > 0) {
        pthread_cond_wait(&removing, &mutex);
    }
    pthread_mutex_unlock(&mutex);

That looks suspicious to me. What is the purpose of waiting for the count to become zero? Your code waits for the count to become zero, but then it does nothing else before it unlocks the mutex.

I don't know what count represents, but if the other reader thread is concurrently manipulating it, then there is no guarantee that it will still be zero once you've unlocked the mutex.

But, maybe that hasn't caused a problem for you because...


...This also looks suspicious:

    pthread_mutex_unlock(&mutex);

    pthread_mutex_lock(&mutex);

Why do you unlock the mutex and immediately lock it again? Are you thinking that will afford the other consumer a chance to lock it? Technically speaking, it does that, but in practical terms, the chance it offers is known as, "a snowball's chance in Hell." If thread A is waiting for a mutex that is locked by thread B, and thread B unlocks and then immediately tries to re-lock, then in most languages/libraries/operating systems, thread B will almost always succeed while thread A goes back to try again.

Mutexes work best when they are rarely contested. If you have a program in which threads spend any significant amount of time waiting for mutexes, then you probably are not getting much benefit from using multiple threads.

  • Related