Lately i followed a course of Operating Systems that sent me to the barrier pseudocode from the little book of semaphores. But for a few hours now i'm struggling to implement this barrier, i can't seem to understand it properly. To understand it, i tried a simple program that lets threads come to barrier, and when all threads arrived, let them pass. Here's my code:
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#define NR_MAX 5
int n=NR_MAX;
int entered = 0;
pthread_mutex_t mtx;
sem_t smph;
void* bariera(void *v){
pthread_mutex_lock(&mtx);
entered ;
printf("thread %d have entered\n", entered);
pthread_mutex_unlock(&mtx);
if(entered == n) {
sem_post(&smph); printf("Out %d \n", entered);}
sem_wait(&smph);
sem_post(&smph);
}
int main() {
pthread_t thr[NR_MAX];
pthread_mutex_init(&mtx, NULL);
sem_init(&smph, 0, 1);
for (int i=0; i<NR_MAX; i ){
pthread_create(&thr[i], NULL, bariera, NULL);
}
for(int i=0; i<NR_MAX; i ){
pthread_join(thr[i], NULL);
}
return 0;
}
How should this be actually implemented? Cause for now, it only prints the order they arrive at the barrier and then it only prints the last one that arrived.
EDIT: Totally forgot, here's the pseudocode:
n = the number of threads
count = 0 - keeps track of how many threads arrived at the barrier
mutex = Semaphore (1) - provides exclusive acces to count
barrier = Semaphore (0) - barrier is locked (zero or negative) until all threads arrive; then it should be unlocked(1 or more)
rendezvous
2
3 mutex.wait()
4 count = count 1
5 mutex.signal ()
6
7 if count == n: barrier.signal ()
8
9 barrier.wait()
10 barrier.signal ()
11
12 critical point
expected output:
Out 5
Out 4
Out 3
Out 2
Out 1
(the order doesn't have to be the same)
Actual output:
Out 5
CodePudding user response:
Three issues:
- Incorrectly initialized semaphore.
- Accessing
entered
outside of the critical section. - Misplaced
printf
. - Missing
return
. - Missing increment in for loops.
void* bariera(void *v) {
int id = (int)(uintptr_t)v;
printf("[%d] Before barrier.\n", id);
pthread_mutex_lock(&mtx);
if( entered == n)
sem_post(&smph); // Wake up a thread.
pthread_mutex_unlock(&mtx);
sem_wait(&smph); // Barrier.
sem_post(&smph); // Wake up another thread.
// Do something after the barrier.
printf("[%d] After barrier.\n", id);
return NULL;
}
sem_init(&smph, 0, 0); // Should initially be zero.
for (int i=0; i<NR_MAX; i) {
pthread_create(&thr[i], NULL, bariera, (void*)(intptr_t)i);
}
Output:
[0] Before barrier.
[2] Before barrier.
[3] Before barrier.
[4] Before barrier.
[1] Before barrier.
[1] After barrier.
[0] After barrier.
[3] After barrier.
[2] After barrier.
[4] After barrier.
That leaves the barrier semaphore un-reusuable. To fix that because it posts n 1
times. To leave it back in its original state, we need to post only n
times.
void* bariera(void *v) {
int id = (int)(uintptr_t)v;
printf("[%d] Before barrier.\n", id);
pthread_mutex_lock(&mtx);
if( entered == n)
for (int i=n; i--; )
sem_post(&smph); // Wake up every thread.
pthread_mutex_unlock(&mtx);
sem_wait(&smph); // Barrier.
// Do something after the barrier.
printf("[%d] After barrier.\n", id);
return NULL;
}
CodePudding user response:
With C11 atomic types, you actually don't need the separate mutex to protect access to the barrier counter, as demonstrated below. This version also encapsulates the barrier-related variables and operations into a struct and functions, and doesn't require that the last thread to hit the barrier also have to wait on the semaphore.
#include <stdatomic.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
struct barrier {
int maxcount;
_Atomic int count;
sem_t sem;
};
void barrier_init(struct barrier *b, int count) {
b->maxcount = b->count = count;
sem_init(&b->sem, 0, 0);
}
void barrier_destroy(struct barrier *b) {
sem_destroy(&b->sem);
}
void barrier_wait(struct barrier *b) {
// Atomically subtract a number and return the *old* value
if (atomic_fetch_sub_explicit(&b->count, 1, memory_order_acq_rel) == 1) {
// Wake up all waiting threads as they're all at the barrier now
for (int n = 0; n < b->maxcount - 1; n = 1) {
sem_post(&b->sem);
}
} else {
sem_wait(&b->sem); // Actual barrier; wake for wakeup
}
}
void* wait_func(void *vb) {
struct barrier *b = vb;
printf("Thread 0x%x before barrier.\n", (unsigned)pthread_self());
barrier_wait(b);
printf("Thread 0x%x after barrier.\n", (unsigned)pthread_self());
return NULL;
}
#define NTHREADS 5
int main(void) {
pthread_t threads[NTHREADS];
struct barrier b;
barrier_init(&b, NTHREADS);
for (int n = 0; n < NTHREADS; n = 1) {
pthread_create(&threads[n], NULL, wait_func, &b);
}
for (int n = 0; n < NTHREADS; n = 1) {
pthread_join(threads[n], NULL);
}
barrier_destroy(&b);
return 0;
}