它们存在吗?
*添加以澄清:
是否有任何可用的库实现无锁(它是线程安全的,并且可能实现自旋锁或其他轻量级同步)ObjectPool(http://en.wikipedia.org/wiki/Object_pool_pattern)使用模板用 C++ 语言编写?
它们存在吗?
*添加以澄清:
是否有任何可用的库实现无锁(它是线程安全的,并且可能实现自旋锁或其他轻量级同步)ObjectPool(http://en.wikipedia.org/wiki/Object_pool_pattern)使用模板用 C++ 语言编写?
我最终编写了自己的对象池,它是线程安全的、无锁的和多核可扩展的,经过基准测试:
它可以使用 4 个线程在 Intel Core 2 Quad 2.4 GHz win7-x64 上每秒执行 1660 万次借还操作
`
#define CACHE_LINE_SIZE 64
#define alignCache __declspec(align(CACHE_LINE_SIZE))
#ifdef _WIN64
# define alignArch __declspec(align( 8))
#else
# define alignArch __declspec(align( 4))
#endif
class InterlockedFlag {
protected:
alignArch volatile unsigned int value;
public:
inline void set(unsigned int val) {
this->value = val;
}
inline unsigned int exchange(unsigned int val) {
return InterlockedExchange(&this->value,val);
}
};
#pragma pack(push,1)
template <typename T> struct ObjectPoolNode {
ObjectPoolNode<T>* next;
T data;
ObjectPoolNode() : next(nullptr) { };
};
#pragma pack(pop,1)
template <typename T> struct alignCache ObjectPoolList {
ObjectPoolList<T>* nextList;
char pad1[CACHE_LINE_SIZE - sizeof(ObjectPoolList<T>*)];
ObjectPoolNode<T>* first;
char pad2[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>*)];
InterlockedFlag consumerLock;
char pad3[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
ObjectPoolNode<T>* last;
char pad4[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>*)];
InterlockedFlag producerLock;
char pad5[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
ObjectPoolNode<T>** storage;
char pad6[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>**)];
size_t available;
size_t count;
ObjectPoolList(size_t count)
: producerLock(false), consumerLock(false)
{
this->available = this->count = count;
this->storage = new ObjectPoolNode<T>*[count+1];
for(size_t i=0 ; i<count+1 ; i++) {
this->storage[i] = new ObjectPoolNode<T>;
}
for(size_t i=0 ; i<count ; i++) {
this->storage[i]->next = this->storage[i+1];
}
this->first = this->storage[0];
this->last = this->storage[count];
}
~ObjectPoolList() {
this->count = 0;
this->available = 0;
if(this->storage) {
for(size_t i=0 ; i<count+1 ; i++) {
delete this->storage[i];
}
delete[] this->storage;
this->storage = NULL;
}
}
};
template <typename T> class alignCache ObjectPool {
private:
ObjectPoolList<T>** lists;
char pad1[CACHE_LINE_SIZE - sizeof(ObjectPoolList<T>**)];
size_t available;
size_t listCount;
public:
ObjectPool(size_t count,size_t parallelCount = 0) {
this->available = count;
this->listCount = parallelCount;
if(this->listCount == 0) {
this->listCount = getSystemLogicalProcessor(); //default
}
this->lists = new ObjectPoolList<T>*[this->listCount];
for(size_t i=0 ; i<this->listCount ; i++) {
this->lists[i] = new ObjectPoolList<T>(count/this->listCount);
}
for(size_t i=0 ; i<this->listCount-1 ; i++) {
this->lists[i]->nextList = this->lists[i+1];
}
this->lists[this->listCount-1]->nextList = this->lists[0];
}
~ObjectPool() {
if(this->lists) {
for(size_t i=0 ; i<this->listCount ; i++) {
delete this->lists[i];
}
delete[] this->lists;
this->lists = NULL;
}
this->available = 0;
this->listCount = 0;
}
T* borrowObj() {
ObjectPoolList<T>* list = this->lists[0];
while( !list->available || list->consumerLock.exchange(true) ) {
if(!this->available) {
return NULL;
}
list = list->nextList;
}
if(list->first->next) {
ObjectPoolNode<T>* usedNode = list->first;
list->first = list->first->next;
list->available--;
this->available--;
list->consumerLock.set(false);
usedNode->next = nullptr;
return &usedNode->data;
}
list->consumerLock.set(false);
return NULL;
}
void returnObj(T* object) {
ObjectPoolNode<T>* node = (ObjectPoolNode<T>*)(((char*)object) - sizeof(ObjectPoolNode<T>*));
ObjectPoolList<T>* list = this->lists[0];
while( list->producerLock.exchange(true) ) {
list = list->nextList;
}
list->last->next = node;
list->last = node;
list->producerLock.set(false);
list->available++;
this->available++;
}
};
`
您最好的选择是查看Boost.Pool,并为其编写一个无锁分配器/互斥接口。
鉴于存在无锁队列,我会说如果池不存在,您当然可以创建一个(几乎)无锁池。
结合经典的 tmalloc 分配器(它可能会锁定但尽可能避免它),我认为您将接近您的目标。