I'm writing a program which receive data from websocket and work with this data in thread pool. I have problem with pthread_cond_wait when processor have 2 or more cores. After pthread_cond_signal signal is received by all threads which run on different cores. For example if I have 2 cores, then the signal will come to 2 threads at once, which are located on these two cores. If I have single core processor all is good. What I have to do to get the program to work correctly on multi-core processors? So that only one thread receives the signal to start work. I wrote an example of my code with generation random text data instead of websocket data.
#include<stdio.h>
#include<stdlib.h>
#include<cstring>
#include<pthread.h>
#include<unistd.h>
pthread_attr_t attrd;
pthread_mutex_t mutexQueue;
pthread_cond_t condQueue;
char textArr[128][24]; //array with random text to work
int tc; //tasks count
int gi; //global array index
void *workThread(void *args){
int ai;//internal index for working array element
while(1){
pthread_mutex_lock(&mutexQueue);
while(tc==0){
pthread_cond_wait(&condQueue,&mutexQueue); //wait for signal if tasks count = 0.
}
ai=gi;
if(gi==127)gi=0;else gi ;
tc--;
pthread_mutex_unlock(&mutexQueue);
printf("%s\r\n",textArr[ai]);
// then work with websocket data
}
}
void *generalThread(void *args){
const char chrs[]="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; //chars fo random text generation
int ai=0;
srand(time(NULL));
while(1){
for(int i=0;i<23;i )textArr[ai][i]=chrs[rand()%61];//generating data instead of websocket data
textArr[ai][23]='\0';
tc ;
pthread_cond_signal(&condQueue); //Send signal for thread to begin work with data
if(ai==127)ai=0;else ai ;
}
}
int main(int argc,char *argv[]){
pthread_attr_init(&attrd);
pthread_attr_setdetachstate(&attrd,PTHREAD_CREATE_DETACHED);
pthread_t gt,wt[32];
for(int i=0;i<32;i )pthread_create(&wt[i],&attrd,&workThread,NULL);
pthread_create(>,NULL,&generalThread,NULL);
pthread_join(gt,NULL);
return 0;
}
CodePudding user response:
First some info:
Rationale
Some implementations, particularly on a multi-processor, may sometimes cause multiple threads to wake up when the condition variable is signaled simultaneously on different processors.
Rationale
Multiple Awakenings by Condition Signal
On a multi-processor, it may be impossible for an implementation of pthread_cond_signal() to avoid the unblocking of more than one thread blocked on a condition variable.
...
The effect is that more than one thread can return from its call to pthread_cond_wait() or pthread_cond_timedwait() as a result of one call to pthread_cond_signal(). This effect is called "spurious wakeup". Note that the situation is self-correcting in that the number of threads that are so awakened is finite; for example, the next thread to call pthread_cond_wait() after the sequence of events above blocks.
So far, so good, the code in your workThread
is proper synchronized (but you should put the printf
in the synchronized section as well) but the code in your generalThread
has no synchronization at all. Encapsulate the code in the while loop with a lock / unlock.
In that case, the first awakened thread has to aquire a lock on the specified mutex, which will be owned by either another thread or the generalThread
. Until the mutex is unlocked, the thread blocks (no matter the reason of its wakeup). After the aquisition, it owns the mutex and all other threads will be blocked, the generalThread
inclusive.
Note: a pthread_cond_wait
implicitly unlocks the specified mutex upon entering the wait state and on a wakeup it tries to aquire a lock on the specified mutex.
CodePudding user response:
Adding a mutex lock to tc
fully corrects my programs:
void *generalThread(void *args) {
const char chrs[]="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
int ai=0;
srand(time(NULL));
while(1){
for(int i=0;i<23;i )textArr[ai][i]=chrs[rand()%61];
textArr[ai][23]='\0';
pthread_mutex_lock(&mutexQueue); //this has been added
tc ;
pthread_mutex_unlock(&mutexQueue); //this has been added
pthread_cond_signal(&condQueue);
if(ai==127)ai=0;else ai ;
}
}