Home > Software engineering >  How do I synchronize threads similar to a barrier, but with the caveat of the amount of threads bein
How do I synchronize threads similar to a barrier, but with the caveat of the amount of threads bein

Time:02-04

I have tried to create a function that works similar to a barrier function, except it can handle the active amount of threads changing. (I can't seem to get it to work either by destroying and reinitializing the barrier whenever a thread exits the function loop).

My issue is that I can't get my replacement function to run properly, i.e. the program softlocks for some reason. So far nothing I've tried has worked to ensure that the threads are synchronized and the program doesn't softlock. I've tried using barriers, I've tried making the exiting threads enter barrier wait as well, to help with the barriers (but I couldn't figure out how to not softlock with the exiting threads, as I always ended up with some thread(s) invariably being left inside the barrier_wait function).

This is my replacement function for the pthread_barrier_wait function:

void SynchThreads()
{
    pthread_mutex_lock(&lock);
    if (threadsGoingToWait < maxActiveThreads)
    {
        threadsGoingToWait  ;
        pthread_cond_signal(&condVar2);
        pthread_cond_wait(&condVar1, &lock);
    } else
    {
        threadsGoingToWait=1;
        pthread_cond_broadcast(&condVar1);
    }
    pthread_mutex_unlock(&lock);
}

To change the value of maxActiveThreads, I have the threads do the following before they exit the function loop:

    pthread_mutex_lock(&tlock);
    maxActiveThreads--;
    if (maxActiveThreads>0)
    {
        pthread_cond_wait(&condVar2, &tlock);
        pthread_cond_broadcast(&condVar1);
    }
    else pthread_cond_broadcast(&condVar2);
    pthread_mutex_unlock(&tlock);

I have the pthread variables initialized before the thread creation as this:

    pthread_barrier_init(&barrier, NULL, maxActiveThreads);
    pthread_mutex_init(&lock, NULL);
    pthread_mutex_init(&tlock, NULL);
    pthread_cond_init(&condVar1, NULL);
    pthread_cond_init(&condVar2, NULL);

I have no clue why the program is softlocking right now, since as far as I know, so long as there's at least 1 thread either remaining or in the waiting fireld, it should release the other threads from the cond_wait they're in.

Edit: If I remove the condVar2 from being used, and instead end the function loop with a barrier_wait, the program no longer softlocks, however it still doesn't function as if it's being synchronized properly. To give some more detail as to what I'm working on: I'm trying to make a sequential Gaussian elimination function parallel. So the issues I've had so far is that either the matrix has the wrong values, or the vectors have the wrong values, or they all have the wrong values. I was hoping by having synchronization points distributed as following would fix the issue of synchronization errors:

static void* gauss_par(void* params)
{
/*getting the threads and the related data*/
    for (int k = startRow; k < N; k =threadCount) /* Outer loop */
    {
        SynchThreads();
        /* Division step */
        SynchThreads();
        /* Vector y and matrix diagonal */
        SynchThreads();
        for (int i = k 1; i < N; i  )
        {
            /* Elimination step */
            SynchThreads();
            /* Vector b and matrix zeroing */
            SynchThreads();
        }
    }
}

CodePudding user response:

As a preliminary, I see &lock in your SyncThreads() and &tlock in your other code snippet. These almost certainly do not go with each other, because proper protection via a mutex relies on all threads involved using the same mutex to guard access to the data in question. I'm having trouble coming up with a way, in C, that the expressions &lock and &tlock could evaluate to pointers of type pthread_mutex_t *, pointing to the same object. Unless maybe one of lock and tlock were a macro expanding to the other, which would be nasty.

With that said,

I have tried to create a function that works similar to a barrier function, except it can handle the active amount of threads changing. (I can't seem to get it to work either by destroying and reinitializing the barrier whenever a thread exits the function loop).

Destroying and reinitializing the (same) barrier object should work if you can ensure that no thread is ever waiting at the barrier when you do so or arrives while you are doing so. But in practice, that's often a difficult condition to ensure.

Barriers are a somewhat specialized synchronization tool, whereas the Swiss army knife of thread synchronization is the condition variable. This appears to be the direction you've taken, and it's probably a good one. However, I suspect you're looking at CVs from a functional perspective rather than a semantic one. This leads to all sorts of issues.

The functional view is along these lines: a condition variable allows threads to block until another thread signals them.

The semantic view is this: a condition variable allows threads to block until some testable condition is satisfied. Generally, that condition is a function of one or more shared variables, and access to those is protected by the same mutex as is used with the CV. The same CV can be used by different threads at the same time to wait for different conditions, and that can make sense when the various conditions are all related to the same data.

The semantic view guides you better toward appropriate usage idioms, and it helps you come to the right answers on questions about who should signal or broadcast to the CV, under what conditions, and even whether to use a CV at all.

Diagrammatically, the basic usage pattern for a CV wait is this:

                 |
                 |                                
                 V               CRITICAL REGION  
             - - - - -                            
            : optional :                          
              - - - - -                           
                 |                                
                 V                                
   -----------------------------                  
  | Is the condition satisfied? | <-              
   -----------------------------    |             
       |                   |        |             
       | Yes               | No     | on waking   
       V                   V        |             
   - - - - -           ---------    |             
  : optional :        | CV wait |---              
    - - - - -          ---------                  
       |                                          
       |                                          
       V

In particular, since the whole point is that threads don't proceed past the wait unless the condition is satisfied, it is essential that they check after waking whether they should, in fact, proceed. This protects against three things:

  1. the thread was awakened via a signal / broadcast even though the condition it is waiting for was not satisfied;

  2. the thread was awakened via a signal / broadcast but by the time it gets a chance to run, the condition it was waiting for is no longer satisfied; and

  3. the thread woke despite the CV not having been signaled / broadcasted to (so there's no reason to expect the condition to be satisified, though it's ok to continue if it happens to be satisfied anyway).

With that in mind, let's consider your pseudo-barrier function. In English, the condition for a thread passing through would be something like "all currently active threads have (re-)reached the barrier since the last time it released." I take maxActiveThreads to be the number of currently active threads (and thus, slightly misnamed). There are some simple but wrong ways to implement that condition, most of them falling over in the event that a thread passes through the barrier and then returns to it before some of the other threads have passed through. A simple counter of waiting threads is not enough, given threads' need to check the condition before proceeding.

One thing you can do is switch which variable you use for a waiter count between different passages through the barrier. That might look something like this:

int wait_count[2];
int wait_counter_index;

void cycleBarrier() {
    // this function is to be called only while holding the mutex locked
    wait_counter_index = !wait_counter_index;  // flip between 0 and 1
    wait_count[wait_counter_index] = 0;
}

void SynchThreads() {
    pthread_mutex_lock(&lock);

    // A thread-specific copy of the wait counter number upon arriving
    // at the barrier
    const int my_wait_index = wait_counter_index;

    if (  wait_count[my_wait_index] < maxActiveThreads) {
        do {
            pthread_cond_wait(&condVar1, &lock);
        } while (wait_count[my_wait_index] < maxActiveThreads);
    } else {
        // This is the thread that crested the barrier, and the first one
        // through it; prepare for the next barrier passage
        assert(wait_counter_index == my_wait_index);
        cycleBarrier();
    }

    pthread_mutex_unlock(&lock);
}

That flip-flops between two waiter counts, so that each thread can check whether the count for the barrier passage it is trying to make has been reached, while also providing for threads arriving at the next barrier passage.

Do note also that although this can accommodate the number of threads decreasing, it needs more work if you need to accommodate the number of threads increasing. If the thread count is increased at a point where some threads have passed the barrier but others are still waiting to do so then the ones still waiting will erroneously think that their target thread count has not yet been reached.

Now let's consider your strategy when modifying maxActiveThreads. You've put a wait on condVar2 in there, so what condition does a thread require to be satisfied in order to proceed past that wait? The if condition guarding it suggests maxActiveThreads <= 0 would be the condition, but surely that's not correct. In fact, I don't think you need a CV wait here at all. Acquiring the mutex in the first place is all a thread ought to need to do to be able to reduce maxActiveThreads.

However, having modified maxActiveThreads does mean that other threads waiting for a condition based on that variable might be able to proceed, so broadcasting to condVar1 is the right thing to do. On the other hand, there's no particular need to signal / broadcast to a CV while holding its associated mutex locked, and it can be more efficient to avoid doing so. Also, it's cheap and safe to signal / broadcast to a CV that has no waiters, and that can allow some code simplification.

Lastly, what happens if all the other threads have already arrived at the barrier? In the above code, it is the one to crest the barrier that sets up for the next barrier passage, and if all the other threads have reached the barrier already, then that would be this thread. That's why I introduced the cycleBarrier() function -- the code that reduces maxActiveThreads might need to cycle the barrier, too.

That leaves us with this:

    pthread_mutex_lock(&lock);
    maxActiveThreads--;
    if (wait_count[wait_counter_index] >= maxActiveThreads) {
        cycleBarrier();
    }
    pthread_mutex_unlock(&lock);
    pthread_cond_broadcast(&condVar1);

Finally, be well aware that all these mutex and cv functions can fail. For robustness, a program must check their return values and take appropriate action in the event of failure. I've omitted all that for clarity and simplicity, but I am rigorous about error checking in real code, and I advise you to be, too.

  • Related