0

在重新考虑设计并从稻田中获得一些输入之后,我想出了类似的东西,但我想知道它的正确性,当我运行它时似乎很好......这个想法是预分配的对象继承自以下内容:

struct Node
{
    void* pool;
};

这样,我们在每个分配的对象中注入一个指向它的池的指针,以便稍后释放它。然后我们有:

template<class T, int thesize>
struct MemPool
{
    T* getNext();
    void free(T* ptr);

    struct ThreadLocalMemPool
    {
        T* getNextTL();
        void freeTL();

        int size;
        vector<T*> buffer;
        vector<int> freeList;
        int freeListIdx;
        int bufferIdx;
        ThreadLocalMemPool* nextTlPool; //within a thread's context a linked list
    };

    int size;
    threadlocal ThreadLocalMemPool* tlPool; //one of these per thread
};

所以基本上我说它MemPool<Cat, 100>给了我一个内存池,对于它的每个线程,getNexts它都会实例化一个线程本地内存池。尺寸我在内部四舍五入到最接近的 2 次方,以便轻松取模(为简单起见,我将其省略)。因为getNext()对于每个线程都是本地的,所以它不需要锁定,我尝试使用原子作为释放部分,如下所示:

T* ThreadLocalMemPool::getNextTL()
{
    int iHead = ++bufferIdx % size;
    int iTail = freeListIdx % size;

    if (iHead != iTail)  // If head reaches tail, the free list is empty.
    {
        int & idx = freeList[iHead];
        while (idx == DIRTY) {}
        return buffer[idx];
    }
    else
    {
        bufferIdx--; //we will recheck next time
        if (nextTLPool)
            return nextTLPool->getNextTL();
        else
            //set nextTLPool to a new ThreadLocalMemPool and return getNextTL() from it..
    }
}

void ThreadLocalMemPool::free(T* ptr)
{
    //the outer struct handles calling this in the right ThreadLocalMemPool

    //we compute the index in the pool from which this pool came from by subtracting from
    //its address the address of the first pointer in this guys buffer
    int idx = computeAsInComment(ptr);

    int oldListIdx = atomic_increment_returns_old_value(freeListIdx);
    freeList[oldListIdx % size] = idx;
}

现在,这个想法是freeListIdx总是落后bufferIdx于池中,因为你不能(我假设正确使用)释放超过你分配的更多。对 free 的调用会同步它们将缓冲区索引返回到空闲列表的顺序,并且 getNext 将在循环返回时接收到这一点。我一直在考虑它,并没有看到逻辑上有任何语义上的错误,它看起来是正确的还是有一些微妙的东西可以破坏它?

4

1 回答 1

3

线程安全问题需要锁定。如果您想放松这一点,您需要只有一个线程使用池的约束。如果您使用我将在下面描述的循环空闲列表,则可以将其扩展到两个线程,条件是一个线程负责分配,另一个线程负责释放。

至于在没有任何其他管理的情况下使用向量,这是一个坏主意……一旦您开始变得支离破碎,您的分配就会受到打击。

实现这一点的一个好方法是分配一个大的 T 块。然后制作一个足够大的循环队列以指向这些块中的每一个。那是你的“自由名单”。您可能只是选择使用索引。如果将每个池限制为 65536 个项目,则可以选择 unsigned short 以节省空间(实际上是 65535 以允许高效的循环队列管理)

通过使用循环队列,您可以在不考虑碎片的情况下进行恒定时间分配和释放。您还知道您的池何时已满(即空闲列表为空),您可以创建另一个池。显然,当您创建一个池时,您需要填写空闲列表。

所以你的班级看起来像这样:

template<class T, size_t initSize>
class MemPool
{
    vector<T> poolBuffer;              // The memory pool
    vector<unsigned short> freeList;   // Ring-buffer (indices of free items)
    unsigned short nHead, nTail;       // Ring-buffer management
    int nCount;                        // Number of elements in ring-buffer
    MemPool<T,initSize> *nextPool;     // For expanding memory pool

    // etc...
};

现在,对于锁定。如果您可以访问原子递增和递减指令并且相当小心,则可以使用线程安全维护空闲列表。唯一需要的互斥锁是当你需要分配一个新的内存池时。

我改变了我原来的想法。您有点需要两个原子操作,并且您需要一个保留的索引值(0xffff)来为队列上的非原子操作旋转:

// I'll have the functions atomic_incr() and atomic_decr().  The assumption here
// is that they do the operation and return the value as it was prior to the
// increment/decrement.  I'll also assume they work correctly for both int and
// unsigned short types.
unsigned short atomic_incr( unsigned short & );
int atomic_incr( int & );
int atomic_decr( int & );

所以分配是这样的:

T* alloc()
{
    // Check the queue size.  If it's zero (or less) we need to pass on
    // to the next pool and/or allocate a new one.
    if( nCount <= 0 ) {
        return alloc_extend();
    }

    int count = atomic_decr(nCount);
    if( count <= 0 ) {
        T *mem = alloc_extend();
        atomic_incr(nCount);     // undo
        return mem;
    }

    // We are guaranteed that there is at least 1 element in the list for us.
    // This will overflow naturally to achieve modulo by 65536.  You can only
    // deal with queue sizes that are a power of 2.  If you want 32768 values,
    // for example, you must do this: head &= 0x7fff;
    unsigned short head = atomic_incr(nHead);

    // Spin until the element is valid (use a reference)
    unsigned short & idx = freeList[head];
    while( idx == 0xffff );

    // Grab the pool item, and blitz the index from the queue
    T * mem = &poolBuffer[idx];
    idx = 0xffff;

    return mem;
};

上面使用了一个新的私有成员函数:

T * alloc_extend()
{
    if( nextPool == NULL ) {
        acquire_mutex_here();
        if( nextPool == NULL ) nextPool = new MemPool<T>;
        release_mutex_here();
        if( nextPool == NULL ) return NULL;
    }
    return nextPool->alloc();
}

当你想释放时:

void free(T* mem)
{
    // Find the right pool to free from.
    if( mem < &poolBuffer.front() || mem > &poolBuffer.back() )
    {
        if( nextPool ) nextPool->free(mem);
        return;
    }

    // You might want to maintain a bitset that indicates whether the memory has
    // actually been allocated so you don't corrupt your pool here, but I won't
    // do that in this example...

    // Work out the index.  Hope my pointer arithmetic is correct here.
    unsigned short idx = (unsigned short)(mem - &poolBuffer.front());

    // Push index back onto the queue.  As with alloc(), you might want to
    // use a mask on the tail to achieve modulo.
    int tail = atomic_incr(nTail);
    freeList[tail] = idx;

    // Don't need to check the list size.  We assume everything is sane. =)
    atomic_incr(nCount);
}

Notice I used the value 0xffff, effectively as a NULL. The setting, clearing and spinning on this value are there to prevent a race situation. You cannot guarantee that it is safe to leave old data in the queue when multiple threads may be calling free while you have other threads calling alloc. Your queue will be cycling through, but the data in it might not be set yet.

Instead of indices, of course, you could just use pointers. But that is 4 bytes (or 8 bytes on a 64-bit application) and the memory overhead might not be worth it, depending on the data size you are pooling. Personally, I would use pointers, but for some reason it seemed easier to use indices in this answer.

于 2012-09-17T22:34:05.937 回答