2

我需要一个具有 timeout-capable 的 C++ 阻塞队列offer()。该队列适用于多个生产者,一个消费者。回到我实施的时候,我没有找到任何适合这个需求的好的现有队列,所以我自己编写了代码。

我看到take()队列中的方法出现了段错误,但它们是间歇性的。我一直在查看问题的代码,但我没有看到任何看起来有问题的东西。

我想知道是否:

  • 有一个现有的库可以可靠地执行此操作,我应该使用(首选提升或仅标头)。
  • 任何人都可以在我的代码中看到我需要修复的任何明显缺陷。

这是标题:

class BlockingQueue
{
    public:
        BlockingQueue(unsigned int capacity) : capacity(capacity) { };
        bool offer(const MyType & myType, unsigned int timeoutMillis);
        MyType take();
        void put(const MyType & myType);
        unsigned int getCapacity();
        unsigned int getCount();

    private:
         std::deque<MyType> queue;
         unsigned int capacity;
};

以及相关的实现:

boost::condition_variable cond;
boost::mutex mut;

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
    Timer timer;

    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());

        if (monitorTimeout <= 0)
        {
            return false;
        }

        if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
        {
            return false;
        }
    }

    cond.notify_all();

    queue.push_back(myType);

    return true;
}

void BlockingQueue::put(const MyType & myType)
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }

    cond.notify_all();

    queue.push_back(myType);
}

MyType BlockingQueue::take()
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);

    while (queue.size() == 0)
    {
        cond.wait(lock);
    }

    cond.notify_one();

    MyType myType = this->queue.front();

    this->queue.pop_front();

    return myType;
}

unsigned int BlockingQueue::getCapacity()
{
    return this->capacity;
}

unsigned int BlockingQueue::getCount()
{
    return this->queue.size();
}

是的,我没有使用模板来实现这个类——这是列表中的下一个:)

任何帮助是极大的赞赏。线程问题真的很难确定。

-本

4

2 回答 2

0

我想您代码中的问题是通过多个线程修改双端队列。看:

  1. 您正在等待来自另一个线程的编码;
  2. 然后立即向其他线程发送信号,说明 deque 在您要修改它之前已解锁;
  3. 然后你修改双端队列,而其他线程认为双端队列已经解锁并开始做同样的事情。

因此,尝试cond.notify_*()在修改双端队列后放置所有内容。IE:

void BlockingQueue::put(const MyType & myType)
{
    boost::unique_lock<boost::mutex> lock(mut);
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }

    queue.push_back(myType);  // <- modify first

    cond.notify_all();        // <- then say to others that deque is free
}

为了更好地理解,我建议阅读有关pthread_cond_wait().

于 2015-02-04T09:10:51.017 回答
0

为什么是 cond 和 mut 全局变量?我希望他们成为您的 BlockingQueue 对象的成员。我不知道还有什么在触及这些东西,但那里可能存在问题。

我也实现了一个 ThreadSafeQueue 作为一个更大项目的一部分:

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

它与您的概念相似,除了入队(又名提供)功能是非阻塞的,因为基本上没有最大容量。为了强制执行容量,我通常在系统初始化时添加一个带有 N 个缓冲区的池,以及在运行时传递消息的队列,这也消除了在运行时分配内存的需要,我认为这是一件好事(我通常从事嵌入式应用程序)。

池和队列之间的唯一区别是池在系统初始化时获得一堆缓冲区。所以你有这样的事情:

ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;

void init()
{
    for (int i = 0; i < NUM_BUFS; i++)
    {
        pool.enqueue(new BufferDataType);
    }
}

然后,当您要发送消息时,请执行以下操作:

void producerA()
{
    BufferDataType *buf;
    if (pool.waitDequeue(buf, timeout) == true)
    {
        initBufWithMyData(buf);
        queue.enqueue(buf);
    }
}

这样,入队功能既快速又简单,但如果池为空,则您将阻塞,直到有人将缓冲区放回池中。这个想法是一些其他线程将阻塞队列,并在处理完缓冲区后将缓冲区返回到池中,如下所示:

void consumer()
{
    BufferDataType *buf;
    if (queue.waitDequeue(buf, timeout) == true)
    {
        processBufferData(buf);
        pool.enqueue(buf);
    }
}

无论如何,看看它,也许它会有所帮助。

于 2013-10-29T22:21:55.860 回答