I'm trying to code a multithreading application to feed images to my machine learning models. Initially I had only one consumer thread (image processing) per each producer thread (in charge of doing image acquisition and feeding them to the buffer if there is empty space). Now I need two consumers to get all the images in the buffer.
For the single producer-consumer I used one mutex and two semaphores to make sure the buffer doesn't go above the maximum size I've established. The code is something like this:
sem_init(&waitPop, 0, bufferLimitSize) //wait pop is used to limit size of buffer
sem_init(&waitPush, 0, 0) //wait push is used to avoid trying to load from an empty buffer
void *producer(){
while(thereAreImages){
image = loadimage()
sem_wait(&waitPop)
mutex.lock()
//This part may be unnecesary or not well coded but adds more safety on limiting buffer size
if(buffer.size>=bufferLimitSize){
mutex.unlock()
mutex.lock()
}
buffer.push_front(image)
mutex.unlock()
sem_post(waitPush)
}
}
void *consumer(){
while(thereAreImages || !buffer.empty()){
sem_wait(&waitPush)
mutex.lock()
data = buffer.back()
image = data.clone()
buffer.pop_back()
mutex.unlock()
sem_post(waitPop)
doImageProcessing(image)
}
}
This worked fine for me and I've tested it a lot but now I need one more consumer thread processing all the images that go to the buffer. Basically, I need both consumers to process the same images even though their processing times are not the same. I've been wondering for some days how could I manage that using as little resources as possible and I haven't found a way that doesn't make the buffer size bigger than desired.
To summarize, I need both consumers to clone every single image that goes into the buffer while avoiding feeding to the buffer more images than the buffer limit established and trying to access to the buffer when is empty.
Thanks in advance!
CodePudding user response:
I solved my own problem and I'll write it here in case you find it useful. Adding only two global boolean variables and being careful with the semaphores I previously used is working fine for me. The producer is the same as previously and both consumers have the same code, differing only in the boolean variable. This variables are initially set to false and are used to register if one of the threads has already cloned the image in the last buffer. Lets say the functions are called consumer1 and consumer2 and each one of them has a boolean to signal that the image has been cloned, signal1 and signal2.
bool signal1 = false
bool signal2 = false
void *consumer1(){
while(thereAreImages || !buffer.empty()){
sem_wait(&waitPush)
mutex.lock()
if(signal1){
printf("This image has already been processes")
sem_post(waitPush)
mutex.unlock()
this_thread::sleep_for(std::chrono::milliseconds(10))//avoid using normal sleep() in multithreading!
continue
}
data = buffer.back()
image = data.clone()
signal1 = true
if(signal1 and signal2){
buffer.pop_back()
signal1 = signal2 = false
sem_post(waitPop)
mutex.unlock()
}else{
sem_post(waitPush)
mutex.unlock()
}
doImageProcessing(image)
}
sem_post(waitPush) //To avoid semaphore deadlock when one thread finishes first
}
The boolean variables are always accessed and written during a mutex lock so there shouldn't be any problems with that. Hope you find it useful and please, if you think of any way to improve it, let me know.