How can I have a single thread wait for n threads to complete a single execution cycle, then repeat this for i cycles. The solution needs to use semaphores to signal the single thread that all n threads have completed an execution cycle. The single thread should then signal ALL n threads that they can carry on with one more execution, ...repeat.
Some challenges I can't fix:
- Controlling the semaphores so that one of the n threads doesn't eat up more than one cycle (semaphore post/wait).
- The n number of workers is only known at run time, so we can't initialize a sem_t array[n].
:
//these would have been read in at run time
int n = 4;
int control = 4;
void *worker(void* args){
int num = (int)args;
printf("Worker id: %d\n", num);
int proceed = 5; // simulates "5" jobs in this workers queue
while(proceed > 0){
sem_wait(&execute);
printf("Report from: %d\n", num);
sem_post(&reports);
proceed--;
}
control--;
}
void *print(){
while(control > 0){
for(int i = 0; i < n; i ) {
sem_wait(&reports);
}
printf("All reported\n");
for(int i = 0; i < n; i ) {
sem_post(&execute);
}
}
}
void main(){
// control/n would have been scanned and passed to threads, the global control var
would be set after reading in n
sem_init(&execute,0,n); //initialization of the first semaphore
sem_init(&reports,0,0); //initialization of the second semaphore
pthread_t printer;
pthread_create(&printer,NULL, &print,NULL);
pthread_t workers[n];
for(int i = 0; i < n; i ) {
int* num = malloc(sizeof(int));
num = i;
pthread_create(&(workers[i]),NULL, &worker, (void*)num);
}
for(int i = 0; i < n; i ) {
pthread_join(workers[i],NULL);
}
pthread_join(printer,NULL);
}
Output:
[rnitz]$ gcc -Wall -Werror -w -g example2.c -std=gnu99 -lpthread
[rnitz]$ ./a.out
Worker id: 0
Worker id: 3
Report from: 3
Report from: 3
Worker id: 2
Worker id: 1
Report from: 3
Report from: 0
All reported
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 0
Report from: 1
Report from: 3
Report from: 2
All reported
Report from: 0
Report from: 0
Report from: 1
Report from: 2
All reported
Report from: 2
Report from: 1
Report from: 1
Report from: 2
All reported
[rnitz]$
Pretty obvious issue where some workers are eating up all the available "cycle" room in the semaphore.
Note: This question is based on simulating a cpu, where n is the number of cpu threads that will work on an their own independent job queue; while a single print thread handles printing the current job being processed for a given cpu cycle. The print thread will:
- Wait for all cpus to finish a cycle
- Print the current job on each cpu
- Signal the n cpus to complete another cycle.
CodePudding user response:
Solution: sem_t*
array of semaphores that are malloc'd at run time.. I couldn't find an example of this anywhere but ended up trying, and having it work :)
I'm keeping the question because I can't find anything similar on SOF, understandably waiting for n threads to synch on every cycle probably isn't common
For this, you can use arrays of semaphores instead of incrementing/decrementing a single semaphore. At minimum you will need at least one semaphore array, this solution uses 2.
This solution can be extended if the worker threads each have a different amount of jobs to complete, you can have a global int that decrements when a thread is finished, which would be then be used in the for-loops of the printer thread. That way if 1 of the n threads is done early due to less work, you can sem_wait()/sem_post n-1 times.
sem_t* execute_arr;
sem_t* reports_arr;
//this both need to be the same value
int n = 4;
int control = 4;
void *worker(void* args){
int num = (int)args;
//printf("init arg: %d\n", num);
int proceed = 5;
while(proceed > 0){
sem_wait(&execute_arr[num]);
printf("Report from: %d\n", num);
sem_post(&reports_arr[num]);
proceed--;
}
control--;
//free memory and/or destroy sems if needed
}
void *print(){
while(control > 0){
for(int i = 0; i < n; i ) {
sem_wait(&reports_arr[i]);
}
printf("All reported\n");
for(int i = 0; i < n; i ) {
sem_post(&execute_arr[i]);
}
}
}
void main(){
execute_arr = (sem_t*)malloc(n * sizeof(sem_t));
reports_arr = (sem_t*)malloc(n * sizeof(sem_t));
for(int i = 0; i < n; i ) {
sem_init(&execute_arr[i], 0, 1);
sem_init(&reports_arr[i], 0, 0);
}
pthread_t printer;
pthread_create(&printer,NULL, &print,NULL);
pthread_t workers[n];
for(int i = 0; i < n; i ) {
int* num = malloc(sizeof(int));
num = i;
pthread_create(&(workers[i]),NULL, &worker, (void*)num);
}
for(int i = 0; i < n; i ) {
pthread_join(workers[i],NULL);
}
pthread_join(printer,NULL);
//free memory and/or destroy sems if needed
}
And the fixed output:
[rnitz]$ ./a.out
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
[rnitz]$
CodePudding user response:
Rather than use a system semaphore, a mutex and a couple of condition variables would work well here for this type of problem.
I'll leave the unfinished work of main
to initialize the global mutex and condition variables in the code sample as an exercise for you.
pthread_mutex mutex;
pthread_cond_t cv_thread;
pthread_cond_t cv_main;
int job_number = 0;
int completions = 0;
int exit_condition = 0;
int n = 4; //number of worker threads
void *worker(void* args){
int num = (int)args;
int lastJobNumber = 0;
int must_exit = 0;
while (1) {
// wait for the main thread to indicate a new job is ready
pthread_mutex_lock(&mutex);
while ((lastJobNumber >= job_number) && !exit_condition) {
pthread_cond_wait(&cv_thread, &mut); // wait for job_number to change or for exit_conditon to be set
}
must_exit = exit_condition;
if (!must_exit) {
lastJobNumber = job_number; // take on the new job!
}
pthread_mutex_unlock(&mutex);
if (must_exit) {
break;
}
printf("Report from: %d. This thread is executing job %d\n", num, lastJobNumber);
pthread_mutex_lock(&mutex);
completions ;
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cv_main); // signal to main to wake up
}
}
void *print() {
for (int i = 0; i < 5; i ) {
pthread_mutex_lock(&mutex);
job_number ;
completions = 0;
// signal to threads a new job is ready
pthread_cond_broadcast(&cv_thread);
// wait for all threads to indicate completion
while (completions < n) {
pthread_cond_wait(&cv_main, &mutex);
}
pthread_mutex_unlock(&mutex);
printf("All reported\n");
}
// signal all threads to exit
pthread_mutex_lock(&mutex);
exit_condition = 1;
pthread_cond_broadcast(&cv_thread);
pthread_mutex_unlock(&mutex);
}