Home > Software engineering >  Two threads incrementing a number
Two threads incrementing a number

Time:12-17

Here's a test assignment I was given that I apparently failed:
1.Use two threads to increment an integer. Thread A increments when even and Thread B increments when odd (for the integer problem we can have it specify up to a number provided on the command line)
1a. What are some of the difficulties in adding more threads? Please show the difficulties with code.
1b. Extra credit – Design an improved solution to the above that can scale with many threads

The feedback after the first try was "didn't address atomic modification and false sharing". I attempted to address them, but there was no feedback for the second attempt. I want to use this test to learn, so I figured I'd ask the topmost experts - you. Thanks in advance. The following is the header for the first try:

#include <iostream>
#include <mutex>
#include <atomic>

class CIntToInc
{
private:
 int m_nVal; //std::atomic<int> m_nVal;
 int m_nMaxVal;
public:
 CIntToInc(int p_nVal, int p_nMaxVal) : m_nVal(p_nVal), m_nMaxVal(p_nMaxVal) { }
 const int GetVal() const { return m_nVal; }
 const int GetMaxVal() const { return m_nMaxVal; }
 void operator   () {   m_nVal; }
};

struct COper
{
 enum class eOper { None = 0, Mutex = 1, NoMutex = 2 };
 eOper m_Oper;
public:
 friend std::istream& operator>> (std::istream &in, COper &Oper);
 bool operator == (const eOper &p_eOper) { return(m_Oper == p_eOper); }
};

The following is the source for the first try. It includes my thoughts as to why the solution would work. I compiled the code in MSVS2012.

// Notes: 
// 1a.
// Since an integer cannot be an odd number and an even number at the same time, thread separation happens naturally when each thread checks the value.
// This way no additional synchronization is necessary and both threads can run at will, provided that it's all they are doing.
// It's probably not even necessary to declare the target value atomic because it changes (and thus lets the other thread increment itself) only at the last moment.
// I would still opt for making it atomic.
// Adding more threads to this setup immediately creates a problem with threads of equal condition (even or odd) stepping on each other.
// 1b.
// By using a mutex threads can cleanly separate. Many threads with the same condition can run concurrently.
// Note: there is no guarantee that each individual thread from a pool of equally conditioned threads will get to increment the number.
// For this method reading has to be inside the mutext lock to prevent a situation where a thread may see the value as incrementable, yet when it gets to it, the value has already 
// been changed by another thread and no longer qualifies.
// cout message output is separated in this approach.
// 
// The speed of the "raw" approach is 10 times faster than that of the mutex approach on an equal number of threads (two) with the mutex time increasing further as you add threads.
// Use 10000000 for the max to feel the difference, watch the CPU graph
//
// If the operation is complex and time consuming, the approach needs to be different still. The "increment" functionality can be wrapped up in a pimpl class, a copy can be made
// and "incremented". When ready, the thread will check for whether the value has changed while the operation was being performed on the copy and, if not, a fast swap under the mutex
// could be attempted. This approach is resource-intensive, but it mininuzes lock time.
//
// The approach above will work if the operation does not involve resources that cannot be easily copied (like a file to the end of which we are writing)
// When such resources are present, the algorithm probably has to implement a thread safe queue.
// END

#include "test.h"
#include <thread>

int main_test();

int main(int argc, char* argv[])
{
 main_test();
 return(0);
}

void IncrementInt2(CIntToInc &p_rIi, bool p_bIfEven, const char *p_ThreadName, std::mutex *p_pMu)
// the version that uses a mutex
// enable cout output to see thread messages
{
 int nVal(0);
 while(true) {
   p_pMu->lock();
   bool DoWork = (nVal = p_rIi.GetVal() < p_rIi.GetMaxVal());
   if(DoWork) {
     //std::cout << "Thread " << p_ThreadName << ": nVal=" << nVal << std::endl;
     if((!(nVal % 2) && p_bIfEven) || (nVal % 2 && !p_bIfEven)) {
      //std::cout << "incrementing" << std::endl;
        p_rIi; } }
   p_pMu->unlock();
   if(!DoWork) break;
   //if(p_bIfEven) // uncomment to force threads to execute differently
   // std::this_thread::sleep_for(std::chrono::milliseconds(10));
   }
}

void IncrementInt3(CIntToInc &p_rIi, bool p_bIfEven, const char *p_ThreadName)
// the version that does not use a mutex
// enable cout output to see thread messages. Message text output is not synchronized
{
 int nVal(0);
 while((nVal = p_rIi.GetVal()) < p_rIi.GetMaxVal()) {
   //std::cout << "Thread " << p_ThreadName << ": nVal=" << nVal << std::endl;
   if((!(nVal % 2) && p_bIfEven) || (nVal % 2 && !p_bIfEven)) {
    //std::cout << "Thread " << p_ThreadName << " incrementing" << std::endl;
      p_rIi; }
    }
}

std::istream& operator>> (std::istream &in, COper &Oper)
// to read operation types from cin
{
 int nVal;
 std::cin >> nVal;
 switch(nVal) {
   case 1: Oper.m_Oper = COper::eOper::Mutex; break;
   case 2: Oper.m_Oper = COper::eOper::NoMutex; break;
   default: Oper.m_Oper = COper::eOper::None; }
 return in;
}

int main_test()
{
 int MaxValue, FinalValue;
 COper Oper;
 std::cout << "Please enter the number to increment to: ";
 std::cin >> MaxValue;
 std::cout << "Please enter the method (1 - mutex, 2 - no mutex): ";
 std::cin >> Oper;

 auto StartTime(std::chrono::high_resolution_clock::now());

 if(Oper == COper::eOper::Mutex) {
   std::mutex Mu;
   CIntToInc ii(0, MaxValue);
   std::thread teven(IncrementInt2, std::ref(ii), true, "Even", &Mu);
   std::thread todd(IncrementInt2, std::ref(ii), false, "Odd", &Mu);
   // add more threads at will, should be safe
   //std::thread teven2(IncrementInt2, std::ref(ii), true, "Even2", &Mu);
   //std::thread teven3(IncrementInt2, std::ref(ii), true, "Even3", &Mu);
   teven.join();
   todd.join();
   //teven2.join();
   //teven3.join();
   FinalValue = ii.GetVal();
   }
 else if(Oper == COper::eOper::NoMutex) {
   CIntToInc ii(0, MaxValue);
   std::thread teven(IncrementInt3, std::ref(ii), true, "Even");
   std::thread todd(IncrementInt3, std::ref(ii), false, "Odd");
   teven.join();
   todd.join();
   FinalValue = ii.GetVal(); }

 std::chrono::duration<double>elapsed_seconds = (std::chrono::high_resolution_clock::now() - StartTime);
 std::cout << "main_mutex completed with nVal=" << FinalValue << " in " << elapsed_seconds.count() << " seconds" << std::endl;

 return(0);
}

For the second attempt I made the following changes to the header:
made m_nVal std::atomic
used atomic methods to increment and retrieve m_nVal
separated m_nVal from the read-only m_nMaxVal by a filler
The source file was unchanged. The new header is below.

#include <iostream>
#include <mutex>
#include <atomic>
class CIntToInc
{
private:
 int m_nMaxVal;
 char m_Filler[64 - sizeof(int)]; // false sharing prevention, assuming a 64 byte cache line
 std::atomic<int> m_nVal;

public:
 CIntToInc(int p_nVal, int p_nMaxVal) : m_nVal(p_nVal), m_nMaxVal(p_nMaxVal) { }
 const int GetVal() const { 
   //return m_nVal;
   return m_nVal.load(); // std::memory_order_relaxed);
   }
 const int GetMaxVal() const { return m_nMaxVal; }
 void operator   () { 
   //  m_nVal;
   m_nVal.fetch_add(1); //, std::memory_order_relaxed); // relaxed is enough since we check this very variable
   }
};

struct COper
{
 enum class eOper { None = 0, Mutex = 1, NoMutex = 2 };
 eOper m_Oper;
public:
 friend std::istream& operator>> (std::istream &in, COper &Oper);
 bool operator == (const eOper &p_eOper) { return(m_Oper == p_eOper); }
};

I don't know if the approach is fundamentally wrong or if there is one or more smaller errors. Any help will be much appreciated.

CodePudding user response:

Your reasoning for why you don't need to synchronize is flawed. You do need to synchronize, even if each threads would naturally alternate over who is the writer. As Pete Becker said, a writer and a reader without synchronization is undefined behavior. You can't predict how it will break, but sometimes it can be seen by the optimizer making assumptions about your code, and do bad things:

Here a thread sets keep_going to false, immediately, which "should" stop the loop:

int main() {
    bool keep_going = true;
    unsigned x = 999;

    auto thr = std::thread([&]() mutable { 
        keep_going = false;  // unsync write ...
    });   

    while (keep_going) {     // ... unsync read - undefined behavior
         x;
    }

    thr.join();
    std::cout << x << std::endl;
}

Live: https://godbolt.org/z/P1rnf8s71

However, it never stops running with g ! Why? The loop optimizer sees a couple of things:

  1. though keep_going is used in a lambda, it does not reason about that running in a background thread because there is no synchronization.
  2. therefore, by the time it gets to the loop, if the lambda was going to change it, it already has.
  3. since nothing writes to keep_going, it state is not going to change by the time it gets to the loop, and so the test can be hoisted outside the loop.
  4. similarly, since the loop cannot be exited, and the loop only writes to x, it is unobservable if it did not write to x, so it removes that wasted work.

The optimizer thus works with AS IF it it was equivalent to this:

bool keep_going = true;
call_ordinary_function(keep_going);
if (keep_going) {
   top:
   goto top;
}

And the generated assembly reflects this:

        call    [QWORD PTR [rax 8]]
.L7:
        cmp     BYTE PTR [rsp 31], 0
        je      .L30

.L8:
        jmp     .L8    <<<< truly infinite loop

.L30:

Not what you expected?

But declaring the boolean atomic changes everything:

std::atomic<bool> keep_going = true;

Now the generated code is:

.L7:
        mov     ebx, 999
        jmp     .L8
.L11:
        add     ebx, 1
.L8:
        movzx   eax, BYTE PTR [rsp 31]
        test    al, al
        jne     .L11
        lea     rdi, [rsp 32]

So now we see:

  1. x is now incremented (since the loop can terminate, so changes to x is visible after the loop),
  2. we keep loading the value of keep_going, reading it into eax and actually check it in the loop.
  3. it actually terminates.

I hope this convinced you that even if you reason about it not being necessary, the generated code might not be what you think.

CodePudding user response:

First of all, the critical section (lock unlock) is contains the odd/even check and is done in an active loop. Thus, the two threads will competitively try to lock the mutex although only one should do that. In the worst case, the thread 1 increments the value and then busy lock unlock the mutex (to actively execute the check) while the other thread 2 waits a very long time to be able to lock the value increment the value. This situation if far from being theoretical since the thread 1 will often have the priority on the mutex (due to the way CPU caches and operating systems work).

One way to address this is issue is to use condition variables. The idea is to lock a mutex, then increment the value, then signal the next thread that is can increment the value and then wait to be waked up by a thread. This solution scale well, but it is often slow if the work is very small because the waits cause some unwanted delays (typically due context switches). When the number of threads is much bigger than the number of core, this solution is quite efficient. This cost can be reduced using a busy read on an atomic variable when the number of thread is small (or when there is many threads and when it is about to be the turn of the thread).

Another solution would be to use two (binary) semaphore. In the beginning, one is acquired and one is not. Each thread try to acquire its own semaphore, increment the integer and then release the other one resulting in a ping-pong-like execution.

False sharing is the least of your problems in your first try. Indeed, while there is possibly a false sharing between the mutex and the incremented integer, this is not a problem because the mutex protect the integer (and is also responsible for the visibility of the integer between threads).

Note that you can use a lock_guard to make the code more safe and easier to read. Moreover, te condition (!(nVal % 2) && p_bIfEven) || (nVal % 2 && !p_bIfEven) is much more complex than it should. Consider using (nVal % 2) ^ p_bIfEven.

For the second try, it is not clear whether you use an atomic with a mutex. Note that there is no need to use them together. In fact it is a bad idea due to the additional overhead caused by atomics. That being said, if you choose to use only an atomic variable, then you need a (weak) compare and swap so to check the value of the atomic variable and change it atomically. This solution is fast as long as the number of thread is smaller than the number of core (due to the busy waiting).

Regarding the false sharing in the second try, m_Filler is not enough to guarantee that there is no false sharing (and is not very straightforward too). Indeed, what is stored after the std::atomic may cause false sharing (std::atomic is not guaranteed to prevent false sharing using some padding and actually often does not). You can fix that using alignas(64) std::atomic<int> m_nVal; alignas(64) char padding;. Note that using 64 is architecture-dependant and alignas(std::hardware_destructive_interference_size) should theoretically be used instead.

  • Related