Home > Enterprise >  Producer/consumer of type <int *>, how to avoid segmentation fault?
Producer/consumer of type <int *>, how to avoid segmentation fault?

Time:09-13

I found a great producer/consumer double buffer example in this question. However, instead of setting T to "int", I want to use "int *". Unfortunately, it doesn't work, and I keep getting segmentation faults when writing to the buffer. The code below is what I've tried. Would anyone mind telling me how to solve that problem?

void processing(DoubleBuffer<int *> *const buf, int num_elts)
{
    std::thread producer([&]() {
       for (int i = 0; i != num_elts;   i) {
          int **item = buf->start_writing();

          if (item != nullptr) {      // Always true
            **item = i;
          }
          buf->end_writing();
      }
   });

   /*
   std::thread consumer([&]() {
      int prev = -1;
      for (int i = 0; i != 10;   i) {
         int* item = *buf->start_reading();
         std::cout << "Consumer: " << item << std::endl;
         if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
            std::cout << "prev: " << prev << std::endl;
         }
         buf->end_reading();
       }
    });
    */
   producer.join();
  //consumer.join();
}


int main(void) {

    int* buffer_a;
    int* buffer_b;

    int num_elts = 10;

    ProducerConsumerDoubleBuffer<int *>  buf;
    buf.m_buf[0] = buffer_a   num_elts;
    buf.m_buf[1] = buffer_b   num_elts;
    processing(&buf, num_elts);
}     

CodePudding user response:

I modified the class from the post you linked to work better with pointer types. Playing around with the member variables seemed wrong, so I added a malloc to the constructor to avoid segfaults.

#include <atomic>
#include <cstdint>
#include <thread>
#include <cassert>
#include <iostream>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ProducerConsumerDoubleBuffer(const int buf_size) : m_state(0) {
        m_buf[0] = new typename std::remove_pointer<T>::type[buf_size];
        m_buf[1] = new typename std::remove_pointer<T>::type[buf_size];
    }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release)   flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire)   0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }
private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};



void processing(ProducerConsumerDoubleBuffer<int *> *const buf, int num_elts)
{
    std::thread producer([&]() {
       for (int i = 0; i != num_elts;   i) {
          int **item = buf->start_writing();

          if (item != nullptr) {      // Always true
            **item = i;
          }
          buf->end_writing();
      }
   });

   std::thread consumer([&]() {
      int prev = -1;
      for (int i = 0; i != num_elts;   i) {
         int** item = buf->start_reading();
         //std::cout << "Consumer: " << item << std::endl;
         if (item != nullptr) {
            assert(**item > prev);
            prev = **item;
            std::cout << "item: " << **item << std::endl;
         }
         buf->end_reading();
       }
    });
   producer.join();
   consumer.join();
}


int main(void) {
    int num_elts = 500000;

    ProducerConsumerDoubleBuffer<int *>  buf(num_elts);
    processing(&buf, num_elts);
}   
  • Related