Home > Net >  Fast process std::bitset<65536> in parallel
Fast process std::bitset<65536> in parallel

Time:05-08

Once there was a deleted question, that I wrote a huge answer to, but this question was deleted and author refused to undelete it.

So posting here a short summary of this question. And immediately answering this question myself, just to share my results.

Question was that if we're given std::bitset<65536> that is processed (by some formula) inside inner loop bit-by-bit, then how can we boost this computation?

Outer loop just called inner loop many times (lets say 50 000 times), and outer loop can't be processed in parallel, because each next iteration depends on results of previous iteration.

Example code of this process:

std::bitset<65536> bits{};
uint64_t hash = 0;
for (size_t i = 0; i < 50000;   i) {
    // Process Bits
    for (size_t j = 0; j < bits.size();   j)
        bits[j] = ModifyBit(i, j, hash, bits[j]);
    hash = Hash(bits, hash);
}

Code above is just one sample way of processing, it is not a real case. The real case is such that many times we process std::bitset<65536> somehow in such a way that all bits can be processed independently.

The question is how we can process bits in parallel as fast as possible inside inner loop.

One important Note that formula that modifies bits is generic, meaning that we don't know it in advance and can't make SIMD instructions out of it.

But what we know is that all bits can be processed independently. And that we need to parallelize this processing. Also we can't parallelize outer loop as each its iteration depends on results of previous iteration.

Another Note is that std::bitset<65536> is quite small, just 1K of 64-bit words. So it means that directly using pool of std::thread of std::async threads will not work as each thread's work will be just around 50-200 nano-seconds, very tiny time to start and stop threads and send work to them. Even std::mutex takes 75 nano-seconds on my Windows machine (although 20 nano-seconds on Linux), so using std::mutex is also a big overhead.

CodePudding user response:

Implemented quite large and complex solution for your task, but which works very fast. On my 4-core (8 hardware threads) laptop I have 6x times multi-core speedup compared to single threaded version (your version of code).

Main idea of solution below is to implement very fast multi core Thread-Pool for running arbitrary tasks that has small overhead. My implementation can handle up to 1-10 Million tasks per second (depending on CPU speed and cores count).

Regular way of asynchronously starting multiple tasks is through usage of std::async or just by creating std::thread. Both these ways are considerably slower than my own implementation. They can't give throughput of 5 Million tasks per second like my implementation gives. And your code needs millions of tasks per second to be run for good speed. That's why I implemented everything from scratch.

After fast thread pool is implemented now we can slice your 64K bitset into smaller sub-sets and process these sub-sets in parallel. I sliced 64K bitset into 16 equal parts (see BitSize / 16 in code), you can set other amount of parts equal to power of two, but not too many, otherwise thread pool overhead will be too large. Usually it is good to slice into amount of parts that is equal to twice the amount of hardware threads (or 4 times amount of cores).

I implemented several classes in C code. AtomicMutex class uses std::atomic_flag in order to implement very fast replacement for mutex that is based on spin-locking approach. This AtomicMutex is used to protect queue of tasks submitted for running on thread pool.

RingBuffer class is based on std::vector and implements simple and fast queue to store any objects. It is implemented using two pointers (head and tail), pointing into vector. When new element is added to queue then tail pointer is advanced to the right, if this pointer reaches end of vector then it wraps around to 0-th position. Same way when element is taken out from queue then head pointer also advances to the right with wrap around. RingBuffer is used to store thread pool tasks.

Queue class is a wrapper around RingBuffer, but with AtomicMutex protection. This spin-lock mutex is used to protect simultaneous adding/taking elements to/from queue from multiple workers' threads.

Pool implements multi-core pool of tasks itself. It creates as many worker threads as there are CPU hardware threads (double amount of cores) minus one. Each worker thread just polls new tasks from queue and executes them immediately. Main thread adds new tasks to queue. Pool also has Wait() capability to wait till all current tasks are finished, this waiting is used as barrier to wait till whole 64K bitset is processed (all sub-parts are processed). Pool accepts any lambdas (function closures) to be run. You can see that 64K bitset sliced into smaller parts is processed by doing pool.Emplace(lambda) and later pool.Wait() is used to wait till all sub-parts are finished. Exceptions from pool workers are collected and reported to user if there is any error. When doing Wait() pool runs tasks also inside main thread not to waste one core for just waiting of tasks to finish.

Timings reported in console are done by std::chrono module.

There is an ability to run both versions - single-threaded (your original version) and multi-threaded using all cores. Switch between single/multi is done by passing MultiThreaded = true template parameter to function ProcessBitset().

Try it online!

#include <cstdint>
#include <atomic>
#include <vector>
#include <array>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <exception>
#include <optional>
#include <memory>
#include <iostream>
#include <iomanip>
#include <bitset>
#include <string>
#include <chrono>
#include <algorithm>
#include <any>
#include <type_traits>

class AtomicMutex {
    class LockerC;
public:
    void lock() {
        while (f_.test_and_set(std::memory_order_acquire))
            //f_.wait(true, std::memory_order_acquire)
        ;
    }
    void unlock() {
        f_.clear(std::memory_order_release);
        //f_.notify_all();
    }
    LockerC Locker() { return LockerC(*this); }
private:
    class LockerC {
    public:
        LockerC() = delete;
        LockerC(AtomicMutex & mux) : pmux_(&mux) { mux.lock(); }
        LockerC(LockerC const & other) = delete;
        LockerC(LockerC && other) : pmux_(other.pmux_) { other.pmux_ = nullptr; }
        ~LockerC() { if (pmux_) pmux_->unlock(); }
        LockerC & operator = (LockerC const & other) = delete;
        LockerC & operator = (LockerC && other) = delete;
    private:
        AtomicMutex * pmux_ = nullptr;
    };
    
    std::atomic_flag f_ = ATOMIC_FLAG_INIT;
};

template <typename T>
class RingBuffer {
public:
    RingBuffer() : buf_(1 << 8), last_(buf_.size() - 1) {}
    T & front() { return buf_[first_]; }
    T const & front() const { return buf_[first_]; }
    T & back() { return buf_[last_]; }
    T const & back() const { return buf_[last_]; }
    size_t size() const { return size_; }
    bool empty() const { return size_ == 0; }
    
    template <typename ... Args>
    void emplace(Args && ... args) {
        while (size_ >= buf_.size()) {
            std::rotate(&buf_[0], &buf_[first_], &buf_[buf_.size()]);
            first_ = 0;
            last_ = buf_.size() - 1;
            buf_.resize(buf_.size() * 2);
        }
          size_;
          last_;
        if (last_ >= buf_.size())
            last_ = 0;
        buf_[last_] = T(std::forward<Args>(args)...);
    }
    
    void pop() {
        if (size_ == 0)
            return;
        --size_;
          first_;
        if (first_ >= buf_.size())
            first_ = 0;
    }
private:
    std::vector<T> buf_;
    size_t first_ = 0, last_ = 0, size_ = 0;
};

template <typename T>
class Queue {
public:
    size_t Size() const { return q_.size(); }
    bool Empty() const { return q_.size() == 0; }
    
    template <typename ... Args>
    void Emplace(Args && ... args) {
        auto lock = m_.Locker();
        q_.emplace(std::forward<Args>(args)...);
    }
    
    T Pop(std::function<void()> const & on_empty = []{},
          std::function<void()> const & on_full  = []{}) {
        while (true) {
            if (q_.empty()) {
                on_empty();
                continue;
            }
            auto lock = m_.Locker();
            if (q_.empty()) {
                on_empty();
                continue;
            }
            on_full();
            T val = std::move(q_.front());
            q_.pop();
            return std::move(val);
        }
    }
    
    std::optional<T> TryPop() {
        auto lock = m_.Locker();
        if (q_.empty())
            return std::nullopt;
        T val = std::move(q_.front());
        q_.pop();
        return std::move(val);
    }
private:
    AtomicMutex m_;
    RingBuffer<T> q_;
};

class RunInDestr {
public:
    RunInDestr(std::function<void()> const & f) : f_(f) {}
    ~RunInDestr() { f_(); }
private:
    std::function<void()> const & f_;
};

class Pool {
public:
    struct FinishExc {};
    struct Worker {
        std::unique_ptr<std::atomic<bool>> pdone = std::make_unique<std::atomic<bool>>(true);
        std::unique_ptr<std::exception_ptr> pexc = std::make_unique<std::exception_ptr>();
        std::unique_ptr<std::thread> thr;
    };
    Pool(size_t nthreads = size_t(-1)) {
        if (nthreads == size_t(-1))
            nthreads = std::thread::hardware_concurrency() - 1;
        std::cout << "Pool has " << nthreads << " worker threads." << std::endl;
        for (size_t i = 0; i < nthreads;   i) {
            workers_.emplace_back(Worker{});
            workers_.back().thr = std::make_unique<std::thread>(
                [&, pdone = workers_.back().pdone.get(), pexc = workers_.back().pexc.get()]{
                    try {
                        std::function<void()> f_done = [pdone]{
                            pdone->store(true, std::memory_order_relaxed);
                        }, f_empty = [this]{
                            CheckFinish();
                        }, f_full = [pdone]{
                            pdone->store(false, std::memory_order_relaxed);
                        };
                        while (true) {
                            RunInDestr set_done(f_done);
                            tasks_.Pop(f_empty, f_full)();
                        }
                    } catch (...) {
                        exc_.store(true, std::memory_order_relaxed);
                        *pexc = std::current_exception();
                    }
                });
        }
    }
    ~Pool() {
        Wait();
        Finish();
    }
    void CheckExc() {
        if (!exc_.load(std::memory_order_relaxed))
            return;
        Finish();
        throw std::runtime_error("Pool: Exception occured!");
    }
    void Finish() {
        finish_ = true;
        for (auto & w: workers_)
            try {
                w.thr->join();
                if (*w.pexc)
                    std::rethrow_exception(*w.pexc);
            } catch (FinishExc const &) {}
        workers_.clear();
    }
    template <typename ... Args>
    void Emplace(Args && ... args) {
        CheckExc();
        tasks_.Emplace(std::forward<Args>(args)...);
    }
    void Wait() {
        while (true) {
            auto task = tasks_.TryPop();
            if (!task)
                break;
            (*task)();
        }
        
        while (true) {
            bool done = true;
            for (auto & w: workers_)
                if (!w.pdone->load(std::memory_order_relaxed)) {
                    done = false;
                    break;
                }
            if (done)
                break;
        }
        
        CheckExc();
    }
private:
    void CheckFinish() {
        if (finish_)
            throw FinishExc{};
    }
    Queue<std::function<void()>> tasks_;
    std::vector<Worker> workers_;
    bool finish_ = false;
    std::atomic<bool> exc_ = false;
};

template <bool MultiThreaded = true, size_t BitSize>
void ProcessBitset(Pool & pool, std::bitset<BitSize> & bset,
        std::string const & businessLogicCriteria) {
    static size_t constexpr block = BitSize / 16;
    for (int j = 0; j < BitSize; j  = block) {
        auto task = [&bset, j]{
            int const hi = std::min(j   block, BitSize);
            for (int i = j; i < hi;   i) {
                if (i % 2 == 0)
                    bset[i] = 0;
                else
                    bset[i] = 1;
            }
        };
        if constexpr(MultiThreaded)
            pool.Emplace(std::move(task));
        else
            task();
    }
    if constexpr(MultiThreaded)
        pool.Wait();
}

static auto const gtb = std::chrono::high_resolution_clock::now();

double Time() {
    return std::chrono::duration_cast<std::chrono::duration<double>>(
        std::chrono::high_resolution_clock::now() - gtb).count();
}

void Compute() {
    Pool pool;
    
    std::bitset<65536> bset;
    std::string businessLogicCriteria;
    
    int const hi = 50000;
    for (int j = 0; j < hi;   j) {
        if ((j & 0x1FFF) == 0 || j   1 >= hi)
            std::cout << j / 1000 << "K (" << std::fixed << std::setprecision(3) << Time() << " sec), " << std::flush;
        ProcessBitset(pool, bset, businessLogicCriteria);
        businessLogicCriteria = "...";
    }
}

void TimeMeasure() {
    size_t constexpr A = 1 << 16, B = 1 << 5;
    {
        Pool pool;
        auto const tb = Time();
        int64_t volatile x = 0;
        for (size_t i = 0; i < A;   i) {
            for (size_t j = 0; j < B;   j)
                pool.Emplace([&]{ x = x   1; });
            pool.Wait();
        }
        std::cout << "AtomicPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
            << " sec, speed " << A * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
            << 1'000'000 / (A * B / (Time() - tb)) << " sec/M-task, no-collisions "
            << std::setprecision(7) << double(x) / (A * B) << std::endl;
    }
    
    {
        auto const tb = Time();
        //size_t const nthr = std::thread::hardware_concurrency();
        size_t constexpr C = A / 8;
        std::vector<std::future<void>> asyncs;
        int64_t volatile x = 0;
        for (size_t i = 0; i < C;   i) {
            asyncs.clear();
            for (size_t j = 0; j < B;   j)
                asyncs.emplace_back(std::async(std::launch::async, [&]{ x = x   1; }));
            asyncs.clear();
        }
        std::cout << "AsyncPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
            << " sec, speed " << C * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
            << 1'000'000 / (C * B / (Time() - tb)) << " sec/M-task, no-collisions "
            << std::setprecision(7) << double(x) / (C * B) << std::endl;
    }
}

int main() {
    try {
        TimeMeasure();
        Compute();
        return 0;
    } catch (std::exception const & ex) {
        std::cout << "Exception: " << ex.what() << std::endl;
        return -1;
    } catch (...) {
        std::cout << "Unknown Exception!" << std::endl;
        return -1;
    }
}

Output for 4 cores (8 hardware threads):

Pool has 7 worker threads.
AtomicPool time 0.903 sec, speed 2321.831 empty K-tasks/sec, 0.431 sec/M-task, no-collisions 0.9999967
AsyncPool time 0.982 sec, speed 266.789 empty K-tasks/sec, 3.750 sec/M-task, no-collisions 0.9999123
Pool has 7 worker threads.
0K (0.074 sec), 8K (0.670 sec), 16K (1.257 sec), 24K (1.852 sec), 32K (2.435 sec), 40K (2.984 sec), 49K (3.650 sec), 49K (3.711 sec),

For comparison below is single-threaded version timings, that is 6x times slower:

0K (0.125 sec), 8K (3.786 sec), 16K (7.754 sec), 24K (11.202 sec), 32K (14.662 sec), 40K (18.056 sec), 49K (21.470 sec), 49K (21.841 sec),

CodePudding user response:

You have this inner loop you want to parallelize:

for (size_t j = 0; j < bits.size();   j)
    bits[j] = ModifyBit(i, j, hash, bits[j]);

So a good idea is to split it into chunks, and have multiple threads do each chunk in parallel. You can submit chunks to workers easily with a std::atomic<int> counter that increments to identify which chunk to work on. You can also make sure the threads all stop working after one loop before starting the next with a std::barrier:

std::bitset<65536> bits{};

std::thread pool[8];  // Change size accordingly
std::atomic<int> task_number{0};
constexpr std::size_t tasks_per_loop = 32;  // Arbitrarily chosen
constexpr std::size_t block_size = (bits.size() tasks_per_loop-1) / tasks_per_loop;

// (only written to by one thread by the barrier, so not atomic)
uint64_t hash = 0;
int i = 0;

std::barrier barrier(std::size(pool), [&]() {
    task_number = 0;
      i;
    hash = Hash(bits, hash);
});

for (std::thread& t : pool) {
    t = std::thread([&]{
        while (i < 50000) {
            for (int t; (t = task_number  ) < tasks_per_loop;) {
                int block_start = t * block_size;
                int block_end = std::min(block_start   block_size, bits.size());
                for (int j = block_start; j < block_end;   j) {
                    bits[j] = ModifyBit(i, j, hash, bits[j]);
                }
            }

            // Wait for other threads to finish and hash
            // to be calculated before starting next loop
            barrier.arrive_and_wait();
        }
    });
}

for (std::thread& t : pool) t.join();

(The seemingly easy way of parallelizing the for loop with OpenMP #pragma omp parallel for seemed slower with some testing, perhaps because the tasks were so small)

Here it is against your implementation running similar code: https://godbolt.org/z/en76Kv4nn

And on my machine, running this a few times with 1 million iterations took 28 to 32 seconds with my approach and 44 to 50 seconds with your general thread pool approach (granted this is much less general because it can't execute arbitrary std::function<void()> tasks).

  • Related