2

我目前正在编写一个并发队列,同时学习如何使用 C++11 的多线程特性。当消费者调用dequeue()函数并且队列没有条目时,函数应该等待直到另一个线程调用enqueue()。我为此使用了一个condition_variable。我的测试在一些条目和线程上运行良好,但是当我使用更多(多达 100000 个元素,20 个生产者,只有 1 个消费者)时,我在condition_variable::wait函数内部遇到访问冲突:

Unbehandelte Ausnahme bei 0x5A2C7EEC (msvcr110d.dll) 在 Tests.exe: 0xC0000005: Zugriffsverletzung beim Lesen an Position 0xFEEEEF6

我已经坚持了几个小时。我希望你能帮助我。谢谢你。

编码:

// --------------------------------------------------------------------------------------
// Concurrent Queue
// --------------------------------------------------------------------------------------

#pragma once
#include <atomic> // Atomic operations for lock-free operations
#include <mutex>
#include <condition_variable>
using namespace std;


// --------------------------------------------------------------------------------------
// Declarations
// --------------------------------------------------------------------------------------

template<typename T>
class ConcurrentQueue;

template<typename T>
class ConcurrentQueueEntry;


// --------------------------------------------------------------------------------------
// Queue
// --------------------------------------------------------------------------------------

template<typename T>
class ConcurrentQueue {
public:
    ConcurrentQueue();
    ~ConcurrentQueue();

    void enqueue(const T value);
    T try_dequeue();
    T dequeue();

    unsigned long count() const;

private:
    atomic<ConcurrentQueueEntry<T>*> front;
    atomic<ConcurrentQueueEntry<T>*> rear;

    atomic_ulong i_count;

    mutex dequeueWaitMutex;
    condition_variable dequeueWaitCV;
};


// --------------------------------------------------------------------------------------
// Entry
// --------------------------------------------------------------------------------------

template<typename T>
class ConcurrentQueueEntry {
public:
    ConcurrentQueueEntry(T _value);

    T value;
    atomic<ConcurrentQueueEntry<T>*> next;
};


// --------------------------------------------------------------------------------------
// Exception: Queue is empty
// --------------------------------------------------------------------------------------

class EmptyQueueException {};


// --------------------------------------------------------------------------------------
// Constructors and Destructor
// --------------------------------------------------------------------------------------

// Create Queue
template<typename T>
ConcurrentQueue<T>::ConcurrentQueue()
    : front(), rear(), i_count(), dequeueWaitMutex(), dequeueWaitCV()
{
    i_count.store(0);
}

// Delete Queue
template<typename T>
ConcurrentQueue<T>::~ConcurrentQueue()
{
    ConcurrentQueueEntry<T>* previous = this->front.load();
    while(previous != NULL) {
        ConcurrentQueueEntry<T>* next = previous->next.load();
        delete previous;
        previous = next;
    }
}

// Create Entry
template<typename T>
ConcurrentQueueEntry<T>::ConcurrentQueueEntry
    (T _value)
    : value(_value), next(NULL)
{ }


// --------------------------------------------------------------------------------------
// Public Methods
// --------------------------------------------------------------------------------------

// Enqueue
template<typename T>
void ConcurrentQueue<T>::enqueue
    (const T value)
{
    // create, append
    ConcurrentQueueEntry<T>* entry = new ConcurrentQueueEntry<T>(value);
    ConcurrentQueueEntry<T>* former_rear = this->rear.exchange(entry);

    // connect
    if(former_rear == NULL) {
        this->front.store(entry);
    }
    else {
        former_rear->next.store(entry);
    }

    // Add
    ++i_count;
    dequeueWaitCV.notify_one();
}

// Dequeue (aborts if queue is empty)
template<typename T>
T ConcurrentQueue<T>::try_dequeue()
{
    ConcurrentQueueEntry<T>* front = this->front.load();
    while(front != NULL && 
        !this->front.compare_exchange_weak(front, front->next.load())); 

    if(front == NULL)
        throw EmptyQueueException();

    --i_count;
    T value = front->value;
    delete front;
    return value;
}

// Dequeue (waits if queue is empty)
template<typename T>
T ConcurrentQueue<T>::dequeue() {
    while(true) {
        try {
            return this->try_dequeue();
        }
        catch(EmptyQueueException) {
            unique_lock<mutex> lock(dequeueWaitMutex);
            dequeueWaitCV.wait(lock, [&] { return this->count() == 0; });
        }
    }
}

// Count entries
template<typename T>
unsigned long ConcurrentQueue<T>::count() const {
    return this->i_count.load();
}

调用堆栈:

msvcr110d.dll!Concurrency::details::LockQueueNode::IsTicketValid() Zeile 924    C++
msvcr110d.dll!Concurrency::details::LockQueueNode::UpdateQueuePosition(Concurrency::details::LockQueueNode * pPreviousNode) Zeile 811   C++
msvcr110d.dll!Concurrency::critical_section::_Acquire_lock(void * _PLockingNode, bool _FHasExternalNode) Zeile 1193 C++
msvcr110d.dll!Concurrency::critical_section::lock() Zeile 1028  C++
msvcr110d.dll!Concurrency::details::_Condition_variable::wait(Concurrency::critical_section & _Lck) Zeile 576   C++
msvcp110d.dll!do_wait(_Cnd_internal_imp_t * * cond, _Mtx_internal_imp_t * * mtx, const xtime * target) Zeile 47 C++
msvcp110d.dll!_Cnd_wait(_Cnd_internal_imp_t * * cond, _Mtx_internal_imp_t * * mtx) Zeile 73 C++
Tests.exe!std::_Cnd_waitX(_Cnd_internal_imp_t * * _Cnd, _Mtx_internal_imp_t * * _Mtx) Zeile 93  C++
Tests.exe!std::condition_variable::wait(std::unique_lock<std::mutex> & _Lck) Zeile 60   C++
Tests.exe!std::condition_variable::wait<<lambda_61c2d1dffb87d02ed418fe62879bb063> >(std::unique_lock<std::mutex> & _Lck, ConcurrentQueue<long>::dequeue::__l7::<lambda_61c2d1dffb87d02ed418fe62879bb063> _Pred) Zeile 67    C++
Tests.exe!ConcurrentQueue<long>::dequeue() Zeile 156    C++
Tests.exe!<lambda_c8c79a4136723f6fef9d0a0557ed768b>::operator()() Zeile 38  C++
Tests.exe!std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil>::operator()() Zeile 1152   C++
Tests.exe!std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> >::_Run(std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> > * _Ln) Zeile 196  C++
Tests.exe!std::_LaunchPad<std::_Bind<0,void,<lambda_c8c79a4136723f6fef9d0a0557ed768b>,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil,std::_Nil> >::_Go() Zeile 187 C++
msvcp110d.dll!_Call_func(void * _Data) Zeile 52 C++
msvcr110d.dll!_callthreadstartex() Zeile 354    C
msvcr110d.dll!_threadstartex(void * ptd) Zeile 337  C
kernel32.dll!747f850d() Unbekannt
[Unten angegebene Rahmen sind möglicherweise nicht korrekt und/oder fehlen, keine Symbole geladen für kernel32.dll] 
ntdll.dll!7719bf39()    Unbekannt
ntdll.dll!7719bf0c()    Unbekannt
4

1 回答 1

4

调试前有两个注意事项:

  • 0xFEEEFEEE6 是 0xFEEEFEEE + 8。0xFEEEFEEE 是调试运行时存储在某些变量中的信号值。我认为这个值表明拥有对象的析构函数已经被调用。相比之下,在初始化之前,我认为该值为 0xCDCDCDCD 。
  • 您可以配置 VS 以关闭国际化。这样你会得到英文错误信息,更适合这里的论坛。

现在,关于您的代码,第一件事是这不是“代码”,而只是其中的一部分。请提炼一个最小的例子。但是,与此同时,您可以使您的队列类不可复制和不可分配(使用“.. = delete;”)。如果我猜对了,那么您将收到来自复制队列的编译错误,可能是在线程启动期间。

于 2013-06-23T08:59:42.617 回答