in below code snippet it looks like notification from producer thread to consumer thread is not reaching once producer produce an single item and due to this behavior once producer has finished generating items equivalent to buffer size then only consumer has started consuming items . Can anybody suggest How we should approach to fix this issue using semaphore.
#include <iostream>
#include <queue>
#include <semaphore.h>
#include <thread>
#include <functional>
const int BUFFER_SIZE = 3;
class Buffer {
public:
sem_t sem_full;
sem_t sem_empty;
std::queue<int> buffer;
Buffer() {
sem_init(&sem_full, 0, BUFFER_SIZE);
sem_init(&sem_empty, 0, 0);
}
void producer() {
while (true) {
sem_wait(&sem_full);
int item = rand() % 10;
buffer.push(item);
std::cout << "Producer added " << item << std::endl;
sem_post(&sem_empty);
if (buffer.size() == BUFFER_SIZE) {
std::cout << "Buffer is full, terminating producer thread" << std::endl;
return;
}
}
}
void consumer() {
while (true) {
sem_wait(&sem_empty);
int item = buffer.front();
buffer.pop();
std::cout << "Consumer removed " << item << std::endl;
sem_post(&sem_full);
if (buffer.empty()) {
std::cout << "Buffer is empty, terminating consumer thread" << std::endl;
return;
}
}
}
};
int main() {
Buffer buffer;
std::thread producer(std::bind(&Buffer::producer, &buffer));
std::thread consumer(std::bind(&Buffer::consumer, &buffer));
producer.join();
consumer.join();
return 0;
}
CodePudding user response:
you need to use binary semaphore here to achieve this behavior without using condition variable to synchronize this.
#include <iostream>
#include <queue>
#include <semaphore.h>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <atomic>
const int BUFFER_SIZE = 4;
class Buffer {
public:
sem_t sem_full;
sem_t sem_empty;
std::queue<int> buffer;
std::condition_variable cv;
std::mutex m;
int buffer_full_count {0};
Buffer() {
sem_init(&sem_full, 0, 1);
sem_init(&sem_empty, 0, 0);
}
void producer() {
while (true) {
sem_wait(&sem_full);
if (buffer_full_count == BUFFER_SIZE) {
std::cout << "Buffer is full, terminating producer thread" << std::endl;
return;
}
std::unique_lock <std::mutex> lock(m);
int item = rand() % 10;
buffer.push(item);
buffer_full_count ;
std::cout << "Producer added " << item << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
sem_post(&sem_empty);
}
}
void consumer() {
while (buffer_full_count != BUFFER_SIZE) {
sem_wait(&sem_empty);
std::unique_lock <std::mutex> lock(m);
int item = buffer.front();
buffer.pop();
std::cout << "Consumer removed " << item << std::endl;
sem_post(&sem_full);
}
}
};
int main() {
Buffer buffer;
std::thread producer(std::bind(&Buffer::producer, &buffer));
std::thread consumer(std::bind(&Buffer::consumer, &buffer));
producer.join();
consumer.join();
return 0;
}