2

我正在尝试使用 c++ 中的 pthreads 编写线程安全队列。我的程序在 93% 的时间里都能正常工作。另外 7% 的时间它会吐出垃圾,或者似乎睡着了。我想知道我的队列中是否存在上下文切换会破坏它的缺陷?

// thread-safe queue
// inspired by http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx
// only works with one producer and one consumer
#include <pthread.h>
#include <exception>

template<class T>
class tsqueue
{
    private:
        volatile int m_ReadIndex, m_WriteIndex;
        volatile T *m_Data;
        volatile bool m_Done;
        const int m_Size;
        pthread_mutex_t m_ReadMutex, m_WriteMutex;
        pthread_cond_t m_ReadCond, m_WriteCond;
    public:
        tsqueue(const int &size);
        ~tsqueue();
        void push(const T &elem);
        T pop();
        void terminate();
        bool isDone() const;
};

template <class T>
tsqueue<T>::tsqueue(const int &size) : m_ReadIndex(0), m_WriteIndex(0), m_Size(size), m_Done(false) {
    m_Data = new T[size];
    pthread_mutex_init(&m_ReadMutex, NULL);
    pthread_mutex_init(&m_WriteMutex, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
}

template <class T>
tsqueue<T>::~tsqueue() {
    delete[] m_Data;
    pthread_mutex_destroy(&m_ReadMutex);
    pthread_mutex_destroy(&m_WriteMutex);
    pthread_cond_destroy(&m_ReadCond);
    pthread_cond_destroy(&m_WriteCond);
}


template <class T>
void tsqueue<T>::push(const T &elem) {
    int next = (m_WriteIndex + 1) % m_Size;
    if(next == m_ReadIndex) {
        pthread_mutex_lock(&m_WriteMutex);
        pthread_cond_wait(&m_WriteCond, &m_WriteMutex);
        pthread_mutex_unlock(&m_WriteMutex);
    }
    m_Data[m_WriteIndex] = elem;
    m_WriteIndex = next;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
T tsqueue<T>::pop() {
    if(m_ReadIndex == m_WriteIndex) {
        pthread_mutex_lock(&m_ReadMutex);
        pthread_cond_wait(&m_ReadCond, &m_ReadMutex);
        pthread_mutex_unlock(&m_ReadMutex);
        if(m_Done && m_ReadIndex == m_WriteIndex) throw "queue empty and terminated";
    }
    int next = (m_ReadIndex +1) % m_Size;
    T elem = m_Data[m_ReadIndex];
    m_ReadIndex = next;
    pthread_cond_signal(&m_WriteCond);
    return elem;
}

template <class T>
void tsqueue<T>::terminate() {
    m_Done = true;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
bool tsqueue<T>::isDone() const {
    return (m_Done && m_ReadIndex == m_WriteIndex);
}

这可以像这样使用:

// thread 1
while(cin.get(c)) {
    queue1.push(c);
}
queue1.terminate();


// thread 2
while(!queue1.isDone()) {
    try{ c = queue1.pop(); }
    catch(char const* str){break;}
    cout.put(c);
}

如果有人看到这个问题,请说出来:)

4

5 回答 5

8

是的,这里肯定有问题。您对队列成员变量的所有访问都发生在互斥锁之外。实际上,我不完全确定您的互斥锁在保护什么,因为它们只是在等待条件变量。

此外,您的读取器和写入器似乎将始终以锁步方式运行,绝不允许队列的大小超过一个元素。

于 2009-02-12T03:07:35.813 回答
3

如果这是您的实际代码,那么一个问题就是您正在初始化m_WriteCond两次,而根本没有初始化m_ReadCond

于 2009-02-12T03:05:51.030 回答
2

你应该把这个类当作一个监视器。您应该为每个队列(普通互斥锁)设置一个“监视器锁”。每当您输入一个读取或写入队列中任何字段的方法时,您应该在输入后立即锁定该互斥锁。这可以防止多个线程一次与队列交互。您应该在等待条件之前和离开方法时释放锁,以便其他线程可以进入。完成等待条件后,请确保重新获取锁。

于 2009-02-12T03:14:44.190 回答
1

If you want anything with decent performance I would strongly suggest dumping your R/W lock and just use a very simple spinlock. Or if you really think you can get the performance you want with R/W lock, i would roll your own based on this design(single word R/W Spinlock) from Joe Duffy.

于 2009-02-12T15:33:54.477 回答
0

似乎问题在于您有一个竞争条件,即线程 2 可以在线程 1 执行任何 cin.get(c) 之前运行。需要确保数据已初始化,并且当您获取信息时,您需要确保在未输入数据的情况下您正在做某事。

也许这是我没有看到完成此操作的其余代码。

于 2009-02-12T03:11:55.587 回答