我目前正在编写一个并发队列,同时学习如何使用 C++11 的多线程特性。当消费者调用dequeue()
函数并且队列没有条目时,函数应该等待直到另一个线程调用enqueue()
。我为此使用了一个condition_variable
。我的测试在一些条目和线程上运行良好,但是当我使用更多(多达 100000 个元素,20 个生产者,只有 1 个消费者)时,我在condition_variable::wait
函数内部遇到访问冲突:
Unbehandelte Ausnahme bei 0x5A2C7EEC (msvcr110d.dll) 在 Tests.exe: 0xC0000005: Zugriffsverletzung beim Lesen an Position 0xFEEEEF6
我已经坚持了几个小时。我希望你能帮助我。谢谢你。
编码:
// --------------------------------------------------------------------------------------
// Concurrent Queue
// --------------------------------------------------------------------------------------
#pragma once
#include <atomic> // Atomic operations for lock-free operations
#include <mutex>
#include <condition_variable>
using namespace std;
// --------------------------------------------------------------------------------------
// Declarations
// --------------------------------------------------------------------------------------
template<typename T>
class ConcurrentQueue;
template<typename T>
class ConcurrentQueueEntry;
// --------------------------------------------------------------------------------------
// Queue
// --------------------------------------------------------------------------------------
template<typename T>
class ConcurrentQueue {
public:
ConcurrentQueue();
~ConcurrentQueue();
void enqueue(const T value);
T try_dequeue();
T dequeue();
unsigned long count() const;
private:
atomic<ConcurrentQueueEntry<T>*> front;
atomic<ConcurrentQueueEntry<T>*> rear;
atomic_ulong i_count;
mutex dequeueWaitMutex;
condition_variable dequeueWaitCV;
};
// --------------------------------------------------------------------------------------
// Entry
// --------------------------------------------------------------------------------------
template<typename T>
class ConcurrentQueueEntry {
public:
ConcurrentQueueEntry(T _value);
T value;
atomic<ConcurrentQueueEntry<T>*> next;
};
// --------------------------------------------------------------------------------------
// Exception: Queue is empty
// --------------------------------------------------------------------------------------
class EmptyQueueException {};
// --------------------------------------------------------------------------------------
// Constructors and Destructor
// --------------------------------------------------------------------------------------
// Create Queue
template<typename T>
ConcurrentQueue<T>::ConcurrentQueue()
: front(), rear(), i_count(), dequeueWaitMutex(), dequeueWaitCV()
{
i_count.store(0);
}
// Delete Queue
template<typename T>
ConcurrentQueue<T>::~ConcurrentQueue()
{
ConcurrentQueueEntry<T>* previous = this->front.load();
while(previous != NULL) {
ConcurrentQueueEntry<T>* next = previous->next.load();
delete previous;
previous = next;
}
}
// Create Entry
template<typename T>
ConcurrentQueueEntry<T>::ConcurrentQueueEntry
(T _value)
: value(_value), next(NULL)
{ }
// --------------------------------------------------------------------------------------
// Public Methods
// --------------------------------------------------------------------------------------
// Enqueue
template<typename T>
void ConcurrentQueue<T>::enqueue
(const T value)
{
// create, append
ConcurrentQueueEntry<T>* entry = new ConcurrentQueueEntry<T>(value);
ConcurrentQueueEntry<T>* former_rear = this->rear.exchange(entry);
// connect
if(former_rear == NULL) {
this->front.store(entry);
}
else {
former_rear->next.store(entry);
}
// Add
++i_count;
dequeueWaitCV.notify_one();
}
// Dequeue (aborts if queue is empty)
template<typename T>
T ConcurrentQueue<T>::try_dequeue()
{
ConcurrentQueueEntry<T>* front = this->front.load();
while(front != NULL &&
!this->front.compare_exchange_weak(front, front->next.load()));
if(front == NULL)
throw EmptyQueueException();
--i_count;
T value = front->value;
delete front;
return value;
}
// Dequeue (waits if queue is empty)
template<typename T>
T ConcurrentQueue<T>::dequeue() {
while(true) {
try {
return this->try_dequeue();
}
catch(EmptyQueueException) {
unique_lock<mutex> lock(dequeueWaitMutex);
dequeueWaitCV.wait(lock, [&] { return this->count() == 0; });
}
}
}
// Count entries
template<typename T>
unsigned long ConcurrentQueue<T>::count() const {
return this->i_count.load();
}
调用堆栈:
msvcr110d.dll!Concurrency::details::LockQueueNode::IsTicketValid() Zeile 924 C++
msvcr110d.dll!Concurrency::details::LockQueueNode::UpdateQueuePosition(Concurrency::details::LockQueueNode * pPreviousNode) Zeile 811 C++
msvcr110d.dll!Concurrency::critical_section::_Acquire_lock(void * _PLockingNode, bool _FHasExternalNode) Zeile 1193 C++
msvcr110d.dll!Concurrency::critical_section::lock() Zeile 1028 C++
msvcr110d.dll!Concurrency::details::_Condition_variable::wait(Concurrency::critical_section & _Lck) Zeile 576 C++
msvcp110d.dll!do_wait(_Cnd_internal_imp_t * * cond, _Mtx_internal_imp_t * * mtx, const xtime * target) Zeile 47 C++
msvcp110d.dll!_Cnd_wait(_Cnd_internal_imp_t * * cond, _Mtx_internal_imp_t * * mtx) Zeile 73 C++
Tests.exe!std::_Cnd_waitX(_Cnd_internal_imp_t * * _Cnd, _Mtx_internal_imp_t * * _Mtx) Zeile 93 C++
Tests.exe!std::condition_variable::wait(std::unique_lock<std::mutex> & _Lck) Zeile 60 C++
Tests.exe!std::condition_variable::wait<<lambda_61c2d1dffb87d02ed418fe62879bb063> >(std::unique_lock<std::mutex> & _Lck, ConcurrentQueue<long>::dequeue::__l7::<lambda_61c2d1dffb87d02ed418fe62879bb063> _Pred) Zeile 67 C++
Tests.exe!ConcurrentQueue<long>::dequeue() Zeile 156 C++
Tests.exe!<lambda_c8c79a4136723f6fef9d0a0557ed768b>::operator()() Zeile 38 C++
Tests.exe!std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil>::operator()() Zeile 1152 C++
Tests.exe!std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> >::_Run(std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> > * _Ln) Zeile 196 C++
Tests.exe!std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> >::_Go() Zeile 187 C++
msvcp110d.dll!_Call_func(void * _Data) Zeile 52 C++
msvcr110d.dll!_callthreadstartex() Zeile 354 C
msvcr110d.dll!_threadstartex(void * ptd) Zeile 337 C
kernel32.dll!747f850d() Unbekannt
[Unten angegebene Rahmen sind möglicherweise nicht korrekt und/oder fehlen, keine Symbole geladen für kernel32.dll]
ntdll.dll!7719bf39() Unbekannt
ntdll.dll!7719bf0c() Unbekannt