Despite the number of similar questions on Stackoverflow, I can't come up with a solution for the following Producer-Consumer problem:
My program has three threads:
One writer thread that reads from a file, saves data into a
sensor_data_t
struct, and writes it into a dynamic pointer-based buffer usingsbuffer_insert(buffer, &sensor_data)
. Once this thread finishes reading it sends an end-of-stream data struct represented bydata->id == 0
.Two reader threads that remove data from the buffer head (FIFO-style), and store it into a temporary data struct using
sbuffer_remove(buffer, &data)
and then print it to the cmd line for testing purposes.
I think I have to avoid at the least:
- My reader threads to try to consume/remove from the buffer while it is empty.
- My reader threads to consume/remove from the buffer at the same time.
On the other hand, I don't think my writer thread in sbuffer_insert()
needs to worry if the readers are changing the head
because it only appends to the tail
. Is this reasoning correct or am I missing something?
Here's what I've done so far:
My main function:
sbuffer_t *buffer;
void *writer(void *fp);
void *reader(void *fp);
int main()
{
// Initialize the buffer
sbuffer_init(&buffer);
// Open sensor_data file
FILE *sensor_data_fp;
sensor_data_fp = fopen("sensor_data", "rb");
// Start thread for reading sensor_data file adding elements to the sbuffer
pthread_t writer_thread;
pthread_create(&writer_thread, NULL, &writer, sensor_data_fp);
// Open sensor_data_out file
FILE *sensor_data_out_fp;
sensor_data_out_fp = fopen("sensor_data_out", "w");
// Start thread 1 and 2 for writing sensor_data_out file
pthread_t reader_thread1;
pthread_t reader_thread2;
pthread_create(&reader_thread1, NULL, &reader, sensor_data_out_fp);
pthread_create(&reader_thread2, NULL, &reader, sensor_data_out_fp);
// Wait for threads to finish and join them
pthread_join(reader_thread1, NULL);
pthread_join(reader_thread2, NULL);
pthread_join(writer_thread, NULL);
// Close sensor_data file
fclose(sensor_data_fp);
// Close sensor_data_out file
fclose(sensor_data_out_fp);
// free buffer
sbuffer_free(&buffer);
return 0;
}
My reader and writer threads:
typedef uint16_t sensor_id_t;
typedef double sensor_value_t;
typedef time_t sensor_ts_t; // UTC timestamp as returned by time() - notice that the size of time_t is different on 32/64 bit machine
typedef struct {
sensor_id_t id;
sensor_value_t value;
sensor_ts_t ts;
} sensor_data_t;
void *writer(void *fp)
{
// cast fp to FILE
FILE *sensor_data_fp = (FILE *)fp;
// make char buffers of size sensor_id_t, sensor_value_t and sensor_ts_t
char sensor_id_buffer[sizeof(sensor_id_t)];
char sensor_value_buffer[sizeof(sensor_value_t)];
char sensor_ts_buffer[sizeof(sensor_ts_t)];
// parse sensor_data file into sensor_id_buffer, sensor_value_buffer and sensor_ts_buffer
while(fread(sensor_id_buffer, sizeof(sensor_id_t), 1, sensor_data_fp) == 1 &&
fread(sensor_value_buffer, sizeof(sensor_value_t), 1, sensor_data_fp) == 1 &&
fread(sensor_ts_buffer, sizeof(sensor_ts_t), 1, sensor_data_fp)) {
// create sensor_data_t
sensor_data_t sensor_data;
// copy sensor_id_buffer to sensor_data.id
memcpy(&sensor_data.id, sensor_id_buffer, sizeof(sensor_id_t));
// copy sensor_value_buffer to sensor_data.value
memcpy(&sensor_data.value, sensor_value_buffer, sizeof(sensor_value_t));
// copy sensor_ts_buffer to sensor_data.ts
memcpy(&sensor_data.ts, sensor_ts_buffer, sizeof(sensor_ts_t));
// print sensor_data for testing
// printf("sensor_data.id: %d, sensor_data.value: %f, sensor_data.ts: %ld\n", sensor_data.id, sensor_data.value, sensor_data.ts);
// insert sensor_data into buffer
sbuffer_insert(buffer, &sensor_data);
}
// Add dummy data to buffer to signal end of file
sensor_data_t sensor_data;
sensor_data.id = 0;
sensor_data.value = 0;
sensor_data.ts = 0;
sbuffer_insert(buffer, &sensor_data);
return NULL;
}
void *reader(void *fp)
{
// cast fp to FILE
//FILE *sensor_data_out_fp = (FILE *)fp;
// Init data as sensor_data_t
sensor_data_t data;
do{
// read data from buffer
if (sbuffer_remove(buffer, &data) == 0) { // SBUFFER_SUCCESS 0
// write data to sensor_data_out file
// fwrite(&data, sizeof(sensor_data_t), 1, sensor_data_out_fp);
// print data for testing
printf("data.id: %d, data.value: %f, data.ts: %ld \n", data.id, data.value, data.ts);
}
}
while(data.id != 0);
// free allocated memory
// free(fp);
return NULL;
}
Global variables and buffer initialization:
typedef struct sbuffer_node {
struct sbuffer_node *next;
sensor_data_t data;
} sbuffer_node_t;
struct sbuffer {
sbuffer_node_t *head;
sbuffer_node_t *tail;
};
pthread_mutex_t mutex;
pthread_cond_t empty, removing;
int count = 0; // reader count
int sbuffer_init(sbuffer_t **buffer) {
*buffer = malloc(sizeof(sbuffer_t));
if (*buffer == NULL) return SBUFFER_FAILURE;
(*buffer)->head = NULL;
(*buffer)->tail = NULL;
// Initialize mutex and condition variables
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&removing, NULL);
return SBUFFER_SUCCESS;
}
sbuffer_remove
(Consumer)
int sbuffer_remove(sbuffer_t *buffer, sensor_data_t *data) {
sbuffer_node_t *dummy;
if (buffer == NULL) return SBUFFER_FAILURE;
// while the count is 0, wait
pthread_mutex_lock(&mutex);
while (count > 0) {
pthread_cond_wait(&removing, &mutex);
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while (buffer->head == NULL){
pthread_cond_wait(&empty, &mutex); // Wait until buffer is not empty
if (data->id == 0){ // end-of-stream
pthread_mutex_unlock(&mutex);
return SBUFFER_NO_DATA;
}
}
count ;
*data = buffer->head->data;
dummy = buffer->head;
if (buffer->head == buffer->tail) // buffer has only one node
{
buffer->head = buffer->tail = NULL;
} else // buffer has many nodes empty
{
buffer->head = buffer->head->next;
}
free(dummy);
count--;
pthread_cond_signal(&removing); // Signal that data is removed
pthread_mutex_unlock(&mutex);
return SBUFFER_SUCCESS;
}
sbuffer_insert
(Producer)
int sbuffer_insert(sbuffer_t *buffer, sensor_data_t *data) {
sbuffer_node_t *dummy;
if (buffer == NULL) return SBUFFER_FAILURE;
dummy = malloc(sizeof(sbuffer_node_t));
if (dummy == NULL) return SBUFFER_FAILURE;
dummy->data = *data;
dummy->next = NULL;
if (buffer->tail == NULL) // buffer empty (buffer->head should also be NULL
{
pthread_mutex_lock(&mutex);
buffer->head = buffer->tail = dummy;
pthread_cond_signal(&empty); // Signal that buffer is not empty
pthread_mutex_unlock(&mutex);
} else // buffer not empty
{
buffer->tail->next = dummy;
buffer->tail = buffer->tail->next;
}
return SBUFFER_SUCCESS;
}
Currently, the code has very unstable behavior. Sometimes it runs and prints everything, sometimes it doesn't print anything and gets stuck in a loop, sometimes it prints everything but the last value comes after the end-of-stream code and it doesn't terminate.
I would really appreciate a solution that explains what I'm doing wrong or a comment that redirects me to a duplicate of my question.
CodePudding user response:
I think I have to avoid at the least:
- My reader threads to try to consume/remove from the buffer while it is empty.
- My reader threads to consume/remove from the buffer at the same time.
Yes, you must avoid those. And more.
On the other hand, I don't think my writer thread in
sbuffer_insert()
needs to worry if the readers are changing thehead
because it only appends to thetail
. Is this reasoning correct or am I missing something?
You are missing at least that
when the buffer contains fewer than two nodes, there is no distinction between the head node and the tail node. This manifests at the code level at least in the fact that your
sbuffer_insert()
andsbuffer_remove()
functions both access bothbuffer->head
andbuffer->tail
. From the perspective of synchronization requirements, it is this lower-level view that matters.Insertion and removal modifies the node objects themselves, not just the overall buffer object.
Synchronization is not just, nor even primarily, about avoiding threads directly interfering with each other. It is even more about the consistency of different threads' views of memory. You need appropriate synchronization to ensure that one thread's writes to memory are (ever) observed by other threads, and to establish ordering relationships among operations on memory by different threads.
Currently, the code has very unstable behavior. Sometimes it runs and prints everything, sometimes it doesn't print anything and gets stuck in a loop, sometimes it prints everything but the last value comes after the end-of-stream code and it doesn't terminate.
This is unsurprising, because your program contains data races, and its behavior is therefore undefined.
Do ensure that neither the reader nor the writer accesses any member of the buffer object without holding the mutex locked. As the code is presently structured, that will synchronize access not only to the buffer structure itself, but also to the data in the nodes, which presently are involved in their own data races.
Now note that here ...
while (buffer->head == NULL){ pthread_cond_wait(&empty, &mutex); // Wait until buffer is not empty if (data->id == 0){ // end-of-stream pthread_mutex_unlock(&mutex); return SBUFFER_NO_DATA; } }
... you are testing for an end-of-data marker before actually reading an item from the buffer. It looks like that's useless in practice. In prarticular, it does not prevent the end-of-stream item from being removed from the buffer, so only one reader will see it. The other(s) will then end up waiting indefinitely for data that will never arrive.
Next, consider this code executed by the readers:
// while the count is 0, wait pthread_mutex_lock(&mutex); while (count > 0) { pthread_cond_wait(&removing, &mutex); } pthread_mutex_unlock(&mutex);
Note well that the reader unlocks the mutex while count
is 0, so there is an opportunity for another reader to reach the while
loop and pass through. I'm not sure that two threads both getting past that point at the same time produces a problem in practice, but the point seems to be to avoid that, so do it right: move the count
from later in the function to right after the while
loop, prior to unlocking the mutex.
Alternatively, once you've done that, it should be clear(er) that you've effectively hand-rolled a binary semaphore. You could simplify your code by switching to an actual POSIX semaphore for this purpose. Or if you want to continue with a mutex CV for this, then consider using a different mutex, as the data to be protected for this purpose are disjoint from the buffer and its contents. That would get rid of the weirdness of re-locking the mutex immediately after unlocking it.
Or on the third hand, consider whether you need to do any of that at all. How is the (separate) mutex protection of the rest of the body of sbuffer_remove()
not sufficient by itself? I propose to you that it is sufficient. After all, you're presently using your hand-rolled semaphore exactly as (another) mutex.
The bones of this code seem reasonably good, so I don't think repairs will be too hard.
First, add the needed mutex protection in sbuffer_insert()
. Or really, just expand the scope of the critical section that's already there:
int sbuffer_insert(sbuffer_t *buffer, sensor_data_t *data) {
sbuffer_node_t *dummy;
if (buffer == NULL) return SBUFFER_FAILURE;
dummy = malloc(sizeof(sbuffer_node_t));
if (dummy == NULL) return SBUFFER_FAILURE;
dummy->data = *data;
dummy->next = NULL;
pthread_mutex_lock(&mutex);
if (buffer->tail == NULL) // buffer empty (buffer->head should also be NULL
{
assert(buffer->head == NULL);
buffer->head = buffer->tail = dummy;
pthread_cond_signal(&empty); // Signal that buffer is not empty
} else // buffer not empty
{
buffer->tail->next = dummy;
buffer->tail = buffer->tail->next;
}
pthread_mutex_unlock(&mutex);
return SBUFFER_SUCCESS;
}
Second, simplify and fix sbuffer_remove()
:
int sbuffer_remove(sbuffer_t *buffer, sensor_data_t *data) {
if (buffer == NULL) {
return SBUFFER_FAILURE;
}
pthread_mutex_lock(&mutex);
// Wait until the buffer is nonempty
while (buffer->head == NULL) {
pthread_cond_wait(&empty, &mutex);
}
// Copy the first item from the buffer
*data = buffer->head->data;
if (data->id == 0) {
// end-of-stream: leave the item in the buffer for other readers to see
pthread_mutex_unlock(&mutex);
return SBUFFER_NO_DATA;
} // else remove the item
sbuffer_node_t *dummy = buffer->head;
buffer->head = buffer->head->next;
if (buffer->head == NULL) {
// buffer is now empty
buffer->tail = NULL;
}
pthread_mutex_unlock(&mutex);
free(dummy);
return SBUFFER_SUCCESS;
}
CodePudding user response:
Not a complete answer here, but I see this in your sbuffer_remove
function:
// while the count is 0, wait
pthread_mutex_lock(&mutex);
while (count > 0) {
pthread_cond_wait(&removing, &mutex);
}
pthread_mutex_unlock(&mutex);
That looks suspicious to me. What is the purpose of waiting for the count
to become zero? Your code waits for the count to become zero, but then it does nothing else before it unlocks the mutex.
I don't know what count
represents, but if the other reader thread is concurrently manipulating it, then there is no guarantee that it will still be zero once you've unlocked the mutex.
But, maybe that hasn't caused a problem for you because...
...This also looks suspicious:
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
Why do you unlock the mutex
and immediately lock it again? Are you thinking that will afford the other consumer a chance to lock it? Technically speaking, it does that, but in practical terms, the chance it offers is known as, "a snowball's chance in Hell." If thread A is waiting for a mutex that is locked by thread B, and thread B unlocks and then immediately tries to re-lock, then in most languages/libraries/operating systems, thread B will almost always succeed while thread A goes back to try again.
Mutexes work best when they are rarely contested. If you have a program in which threads spend any significant amount of time waiting for mutexes, then you probably are not getting much benefit from using multiple threads.