Home > Net >  Thread-safe lockless memory pool: free function does not behave correctly in multi-thread
Thread-safe lockless memory pool: free function does not behave correctly in multi-thread

Time:05-19

I have a simple implementation of a thread-safe allocator of same-sized buffers. Inside, the implementation is a very simple interlocked singly-linked list, that utilizes the unused space in unallocated buffers to maintain a singly-linked list.

Also wrote some tests, that test the code in single-threaded mode - everything seems to be ok. Managed to isolate the problem to the Free function, but I just can't seem to find it.

I have to mention, that I ran some tests with the exact same code using Microsoft's Interlocked Singly Linked Lists, and obviously it works, but I still want to find out what is the problem with my implementation. Even tried disassembling the code and applying similar intrinsics, but it did not help (also to note, that I don't need to keep track of number of list entries, so that is why I don't need interlock function to exchange double register-size elements like InterlockedCompareExchange128 for x64)

Here is the allocator's code:

#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_

#include <windows.h>

template<size_t TSizeOfElem>
class PoolNoLock {
public:
    PoolNoLock(size_t N) :
        n(N),
        arr(new ELEMENT[n])
    {
        for (size_t i = 0; (n - 1) > i;   i)
        {
            arr[i].next = &arr[i   1];
        }
        arr[n - 1].next = nullptr;

        for (size_t i = 0; n > i;   i)
        {
            arr[i].allocated = false;
        }
    }

    ~PoolNoLock() { delete[] arr; }

    void *Alloc()
    {
        ELEMENT *allocBuff;

        do
        {
            allocBuff = ptrFree;
            if (!allocBuff)
            {
                return nullptr;
            }
        } while (allocBuff != InterlockedCompareExchangePointer(
            reinterpret_cast<void *volatile *>(&ptrFree),
            allocBuff->next,
            allocBuff
        ));

        if (allocBuff->allocated)
        {
            __debugbreak(); //will break here
        }

        allocBuff->allocated = true;

        return &allocBuff->buff;
    }

    void Free(void *Address)
    {
        ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);

        if (!freeBuff->allocated)
        {
            __debugbreak();
        }

        freeBuff->allocated = false;

        ELEMENT *cmpFree = ptrFree;
        do
        {
            freeBuff->next = cmpFree;

            ELEMENT *const xchgFree =
                reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
                    reinterpret_cast<void *volatile *>(&ptrFree),
                    freeBuff,
                    cmpFree
                ));

            if (xchgFree == cmpFree)
            {
                break;
            }

            cmpFree = xchgFree;
        } while (true);
    }

private:
    typedef struct _ELEMENT {
        union {
            _ELEMENT *next;
            unsigned char buff[TSizeOfElem];
        };
        bool allocated; //debug info
    }ELEMENT;

    const size_t n;
    ELEMENT *const arr; //array of list elements

    ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};

#endif // _POOLNOLOCK_HPP_

And this is the code I use to stress-test the class:

  1. 64 is the maximum number of objects, that WaitForMultipleObjects can wait for
  2. the wait in the thread is needed to help achieve a scenario, where as much thread as possible is accessing the resource
  3. the number of thread spawned is exactly equal to the number of elements in the allocator, this is the reason why the alloc-only test works
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>

static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;

struct ThreadParam {
    PoolNoLock<sizeof(size_t)> *allocator;
    const HANDLE &hStartEvent;
    void *addressAlloc = nullptr;

    ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
        allocator(Allocator),
        hStartEvent(StartEvent)
    {};
};

template<bool TAllocOnly>
class Test {
public:
    ~Test()
    {
        CloseHandle(hStartEvent);
    }

    bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
    {
        std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));

        if (TRUE != ResetEvent(hStartEvent))
        {
            return false;
        }

        for (size_t i = 0; N_THREAD != i;   i)
        {
            handles[i] = CreateThread(nullptr,
                0,
                reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
                &params[i],
                CREATE_SUSPENDED,
                &tids[i]);
            if (!handles[i])
            {
                return false;
            }
        }

        for (HANDLE handle : handles)
        {
            if (1 != ResumeThread(handle))
            {
                return false;
            }
        }

        if (TRUE != SetEvent(hStartEvent))
        {
            return false;
        }

        if ((WAIT_OBJECT_0   N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
        {
            return false;
        }

        for (size_t i = 0; N_THREAD != i;   i)
        {
            if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
            {
                return false;
            }

            DWORD exitCode;
            if (TRUE != GetExitCodeThread(handles[i], &exitCode))
            {
                return false;
            }

            if (0 != exitCode)
            {
                return false;
            }

            if (TRUE != CloseHandle(handles[i]))
            {
                return false;
            }
        }

        if (TAllocOnly)
        {
            std::map<void *, DWORD> threadAllocations;

            for (size_t i = 0; N_THREAD != i;   i)
            {
                if (!params[i].addressAlloc)
                {
                    return false;
                }

                if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
                {
                    return false;
                }

                std::pair<std::map<void *, DWORD>::iterator, bool> res =
                    threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));

                if (!res.second)
                {
                    return false;
                }

                Allocator->Free(params[i].addressAlloc);
            }

            if (N_THREAD != threadAllocations.size())
            {
                return false;
            }
        }

        return false;
    }

    bool RunMultiple()
    {
        for (size_t i = 0; N_TEST_RUN != i;   i)
        {
            PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
            RunSingle(&allocator);
        }

        return true;
    }

private:
    const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);

    HANDLE handles[N_THREAD] = { nullptr };
    DWORD tids[N_THREAD] = { 0 };

    static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
    {
        if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
        {
            return 2;
        }

        Param->addressAlloc = Param->allocator->Alloc();
        if (!Param->addressAlloc)
        {
            return 3;
        }

        return 0;
    }

    static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
    {
        if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
        {
            return 2;
        }

        for (size_t i = 0; N_ALLOC_FREE != i;   i)
        {
            void *ptrTest = Param->allocator->Alloc();
            if (!ptrTest)
            {
                return 3;
            }

            Param->allocator->Free(ptrTest);
        }

        return 0;
    }

    const LPTHREAD_START_ROUTINE threadProc =
        TAllocOnly
        ? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
        : reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};

int main()
{
    Test<true> testAllocOnly0;
    Test<false> TestAllocFree0;

    if (!testAllocOnly0.RunMultiple()) //this test will succeed
    {
        std::cout << "Test failed" << std::endl;
        return 1;
    }
    std::cout << "Alloc-ONLY tests succeeded" << std::endl;

    if (!TestAllocFree0.RunMultiple()) //this test will fail
    {
        std::cout << "Test failed" << std::endl;
        return 1;
    }
    std::cout << "Alloc/free tests succeeded" << std::endl;

    std::cout << "All tests succeeded" << std::endl;

    return 0;
}

CodePudding user response:

your error in Alloc() routine. more exactly in line

InterlockedCompareExchangePointer(
            reinterpret_cast<void *volatile *>(&ptrFree),
            allocBuff->next, // <-- !!!
            allocBuff)

here 2 operations : first cpu read allocBuff->next from allocBuff pointer and then try CAS on ptrFree but this 2 operations not atomic and can be break between it . at time when you try use allocBuff->next - the allocBuff can be already alocated by another thread and next overwriten to trash (not valid pointer for instance).

so let exist 2 threads: T#1 and T#2

  • T#1 read allocBuff from ptrFree
  • T#2 read allocBuff from ptrFree
  • T#2 return allocBuff to user
  • T#2 overwrite context of allocBuff->next to user data, let to -1.
  • T#1 read next = allocBuff->next and got some user data ( -1 )
  • T#2 free/push allocBuff back to ptrFree
  • T#1 ok in CAS because ptrFree point again to allocBuff. ptrFree point to -1 now
  • T#2 read -1 from ptrFree
  • T#2 try read from -1
  • crash

here even only single element (a) in stack and 2 threads enough for demo. let the same example: where head (F) and element (a) in stack. initial state: F -> a -> 0

  • T#1 read a from F
  • T#2 read a from F
  • T#2 read 0 from a
  • T#2 write 0 to F and return a to user : F -> 0
  • T#2 write -1 to a: a = -1;
  • T#1 read -1 from a
  • T#2 free a: F -> a -> 0
  • T#1 write -1 to F and return a to user : F -> -1
  • T#2 read -1 from F
  • T#2 try read from -1

possible and another race here

F - a - b - c

and you want pop a and assign F to b. but before you do this another threads first pop a, so now

F - b - c

then pop b:

F - c

and push/free a:

F - a - c.

because F pointed to a again the CAS will be ok and you make chain

F - b - trash

because b really in use now.


in any case your implementation very far from the best. you not need templates here, the TSizeOfElem and n need know only at init procedure and no more need. for stress test need not many threads, but delays in critical points

void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
    allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;
    
    WCHAR sz[16], txt[64];
    swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
    swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);
    
    MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}

void NotifyCheck(PVOID buf, PCWSTR fmt)
{
    WCHAR sz[16], txt[64];
    swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
    swprintf_s(txt, _countof(txt), fmt, buf);
    MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}

class PoolNoLock 
{
    PVOID _arr = 0;                     //array of list elements
    PSINGLE_LIST_ENTRY _ptrFree = 0;    //head of "singly" linked list

public:
    bool Init(size_t N, size_t SizeOfElem)
    {
        if (N)
        {
            if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
            {
                SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
            }

            union {
                PUCHAR buf;
                PSINGLE_LIST_ENTRY Next;
            };

            if (buf = new UCHAR[N * SizeOfElem])
            {
                _arr = buf;

                PSINGLE_LIST_ENTRY ptrFree = 0;

                do 
                {
                    Next->Next = ptrFree;
                    ptrFree = Next;
                } while (buf  = SizeOfElem, --N);

                _ptrFree = ptrFree;
            }

            return true;
        }

        return false;
    }

    ~PoolNoLock() 
    { 
        if (PVOID buf = _arr)
        {
            delete[] buf; 
        }
    }

    void *Alloc()
    {
        PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;

        for (;;)
        {
            allocBuff = ptrFree;

            if (!allocBuff)
            {
                return 0;
            }

            NotifyCheck(allocBuff, L"try: %p");

            // access allocBuff->Next !!
            PSINGLE_LIST_ENTRY Next = allocBuff->Next;

            NotifyCheck(Next, L"next: %p");

            ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
            //ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);

            if (ptrFree == allocBuff)
            {
                NotifyAllocated(allocBuff);
                return allocBuff;
            }
        }
    }

    void Free(void *Address)
    {
        PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;

        for ( ; ; ptrFree = newFree)
        {
            reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;

            newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);

            if (newFree == ptrFree)
            {
                return ;
            }
        }
    }
};

ULONG ThreadTest(PoolNoLock* p)
{
    ULONG n = 2;
    do 
    {
        WCHAR sz[16], txt[32];
        swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
        swprintf_s(txt, _countof(txt), L"loop %x", n);
        MessageBoxW(0, txt, sz, MB_OK);
        if (void* buf = p->Alloc())
        {
            p->Free(buf);
        }
    } while (--n);

    return 0;
}

void DemoTest()
{
    PoolNoLock p;
    if (p.Init(1, sizeof(PVOID)))
    {
        ULONG n = 2;
        do 
        {
            CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
        } while (--n);
    }

    MessageBoxW(0, 0, L"Wait", MB_OK);
}

this is same as your code, only optimized. bug the same - at

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
        (void**)&_ptrFree, allocBuff->Next, allocBuff);

for test and better understand - better write as

PSINGLE_LIST_ENTRY Next = allocBuff->Next;

// delay !!

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
    (void**)&_ptrFree, Next, allocBuff);

for solve this problem SLIST_HEADER used - this is by fact combination of pointer to stack top operation count. correct implementation can be next (if not direct use SLIST_HEADER and api for it)

class PoolNoLock 
{
    PVOID _arr = 0;                     //array of list elements

    struct U  
    {
        PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
        ULONG_PTR allocCount;

        void operator = (U& v)
        {
            ptr = v.ptr;
            allocCount = v.allocCount;
        }

        U(U* v)
        {
            ptr = v->ptr->Next;
            allocCount = v->allocCount   1;
        }

        U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
        {
        }

        U() : ptr(0), allocCount(0)
        {
        }

    } u;

    //   debug
    LONG _allocMiss = 0;
    LONG _freeMiss = 0;
    //-- debug

public:
    bool Init(size_t N, size_t SizeOfElem)
    {
        if (N)
        {
            if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
            {
                SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
            }

            union {
                PUCHAR buf;
                PSINGLE_LIST_ENTRY Next;
            };

            if (buf = new UCHAR[N * SizeOfElem])
            {
                _arr = buf;

                PSINGLE_LIST_ENTRY ptrFree = 0;

                do 
                {
                    Next->Next = ptrFree;
                    ptrFree = Next;
                } while (buf  = SizeOfElem, --N);

                u.ptr = ptrFree;
                u.allocCount = 0;
            }

            return true;
        }

        return false;
    }

    ~PoolNoLock() 
    { 
        if (PVOID buf = _arr)
        {
            delete[] buf; 
        }
    }

    void *Alloc()
    {
        for (;;)
        {
            U allocBuff = u;

            if (!allocBuff.ptr)
            {
                return 0;
            }

            U Next(&allocBuff);

            if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
            {
                // for debug only
                allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;

                return allocBuff.ptr;
            } 

            // for debug only
            InterlockedIncrementNoFence(&_allocMiss);
        }
    }

    void Free(void *Address)
    {
        for ( ; ; )
        {
            U ptrFree = u;
            U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);
            
            reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;

            if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
            {
                return ;
            }

            // for debug only
            InterlockedIncrementNoFence(&_freeMiss);
        }
    }
};

ULONG ThreadTest(PoolNoLock* p)
{
    ULONG n = 0x10000;
    do 
    {
        if (void* buf = p->Alloc())
        {
            p->Free(buf);
        }
    } while (--n);

    return 0;
}

void DemoTest()
{
    PoolNoLock p;
    if (p.Init(16, sizeof(PVOID)))
    {
        ULONG n = 8;
        do 
        {
            CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
        } while (--n);
    }

    MessageBoxW(0, 0, L"Wait", MB_OK);
}
  • Related