特别是,我正在寻找一个阻塞队列。C ++ 11中有这样的事情吗?如果没有,我的其他选择是什么?我真的不想再自己下降到线程级别了。太容易出错了。
6 回答
根据微软 Visual C++ 团队的 Diego Dagum 的说法:
一个经常出现的问题(嗯,其中之一)是关于 STL 容器以及它们是否是线程安全的。
在这里用 Stephan 的话来说,现实情况是它们不是,不是作为一个 bug,而是作为一个特性:让每个 STL 容器的每个成员函数都获取一个内部锁会破坏性能。作为一个通用的、高度可重用的库,它实际上也不会提供正确性:放置锁的正确级别取决于程序正在做什么。从这个意义上说,单个成员函数往往不是这样正确的级别。
并行模式库(PPL) 包括几个容器,它们提供对其元素的线程安全访问:
- concurrent_vector 类是允许随机访问任何元素的序列容器类。它支持并发安全追加、元素访问、迭代器访问和迭代器遍历操作。
- concurrent_queue 类是一个序列容器类,它允许对其元素进行先进先出的访问。它启用了一组有限的并发安全操作,例如 push 和 try_pop,仅举几例。
这里有一些样品。
也很有趣:http ://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html 。
C++11 本身不提供并发容器。但是,有库选项。除了已经提到的 PPL,别忘了英特尔 TBB 库。
它有一个并发queue
的hash_map
、、set
和vector
实现。但它不仅是一个线程安全的容器库,它还附带标准算法的并行版本(for-loop、reduce、sort...)。
我很惊讶没有人提到moodycamel::ConcurrentQueue。我们使用它已经有一段时间了,它的表现非常好。特别是它的实现是无锁的,立即带来了巨大的速度。使用它的其他原因(引自官方网站):
C++ 没有那么多成熟的无锁队列。Boost 有一个,但它仅限于具有琐碎的赋值运算符和琐碎的析构函数的对象,例如。英特尔的 TBB 队列不是无锁的,也需要简单的构造函数。有很多学术论文在 C++ 中实现了无锁队列,但可用的源代码很难找到,测试更是如此。
警告:在多个生产者的情况下,弹出元素的顺序不能保证与推送元素的顺序相同(@IgorLevicki),因此如果您需要此保证,请寻找其他选项。
容器的接口根本就不是为了这个目标而设计的。对于他们使用的接口,客户端可见的锁确实是您可以在保证正确性和可预测行为的同时完成此操作的唯一方法。这也将是非常低效的,因为收购的数量会非常高(相对于良好的实施而言)。
解决方案 1
按值传递(如果适用)。
解决方案 2
创建一个简单的螺栓实现集合,您可以使用它们在持有范围锁的同时传递容器(考虑它是伪 C++):
template <typename TCollection>
class t_locked_collection {
public:
t_locked_collection(TCollection& inCollection, t_lock& lock) : collection(inCollection), d_lock(lock), d_nocopy() {
}
TCollection& collection;
// your convenience stuff
private:
t_scope_lock d_lock;
t_nocopy d_nocopy;
};
然后调用者将锁与集合配对,然后您更新接口以在适当的情况下使用(传递)容器类型。这只是穷人的阶级延伸。
这个锁定的容器是一个简单的例子,还有一些其他的变体。这是我选择的路线,因为它确实允许您使用最适合您的程序的粒度级别,即使它不像锁定方法那样透明(在语法上)。调整现有程序也相对容易。与带有内部锁的集合不同,至少它的行为方式是可预测的。
另一种变体是:
template <typename TCollection>
class t_lockable_collection {
public:
// ...
private:
TCollection d_collection;
t_mutex d_mutex;
};
// example:
typedef t_lockable_collection<std::vector<int> > t_lockable_int_vector;
...其中类似于 的类型t_locked_collection
可用于公开基础集合。并不是说这种方法是万无一失的,只是防傻。
我的并发无序映射命名空间并发版本{
template<typename T,typename T1>
class unordered_bucket: private std::unordered_map<T,T1>
{
mutable std::recursive_mutex m_mutex;
public:
T1 &operator [](T a)
{
std::lock_guard<std::recursive_mutex> l(m_mutex);
return std::unordered_map<T,T1>::operator [](a);
}
size_t size() const noexcept {
std::lock_guard<std::recursive_mutex> l(m_mutex);
return std::unordered_map<T,T1>::size();
}
vector<pair<T,T1>> toVector() const
{
std::lock_guard<std::recursive_mutex> l(m_mutex);
vector<pair<T,T1>> ret;
for(const pair<T,T1> &p:*this)
{
ret.push_back(p);
}
return ret;
}
bool find(const T &t) const
{
std::lock_guard<std::recursive_mutex> l(m_mutex);
if(this->std::unordered_map<T,T1>::find(t) == this->end())
return false; //not found
return true;
}
void erase()
{
std::lock_guard<std::recursive_mutex> l(m_mutex);
this->unordered_map<T,T1>::erase(this->begin(),this->end());
}
void erase(const T &t)
{
std::lock_guard<std::recursive_mutex> l(m_mutex);
this->unordered_map<T,T1>::erase(t);
}
};
#define BUCKETCOUNT 10
template<typename T,typename T1>
class ConcurrentMap
{
std::vector<unordered_bucket<T,T1>> m_v;
public:
ConcurrentMap():m_v(BUCKETCOUNT){} //using 10 buckets
T1 &operator [](T a)
{
std::hash<T> h;
return m_v[h(a)%BUCKETCOUNT][a];
}
size_t size() const noexcept {
size_t cnt=0;
for(const unordered_bucket<T,T1> &ub:m_v)
cnt=cnt+ub.size();
return cnt;
}
vector<pair<T,T1>> toVector() const
{
vector<pair<T,T1>> ret;
for(const unordered_bucket<T,T1> &u:m_v)
{
const vector<pair<T,T1>> &data=u.toVector();
ret.insert(ret.end(),data.begin(),data.end());
}
return ret;
}
bool find(const T &t) const
{
for(const unordered_bucket<T,T1> &u:m_v)
if(true == u.find(t))
return true;
return false;
}
void erase()
{
for(unordered_bucket<T,T1> &u:m_v)
u.erase();
}
void erase(const T &t)
{
std::hash<T> h;
unordered_bucket<T,T1> &ub = m_v[h(t)%BUCKETCOUNT];
ub.erase(t);
}
};
}
C++11 中没有并发容器。
但是下面的头类使用 std::deque 提供并发队列、堆栈和优先级容器。
BlockingCollection是一个 C++11 线程安全集合类,它以 .NET BlockingCollection 类为模型。