1

嗨,快速的问题给定以下适用于生产者/消费者的 c++11 代码,问题是我想关闭 DataQueue 并停止所有消费者。尽管存在问题,因为问题是消费者只需调用 popWait() 并且可以被阻止。在这种情况下,我该如何关闭我的消费者?这可能是一个需要纠正的设计问题。我试图不招致任何性能损失,因为此代码理想情况下应该使用中断模式或类似模式来使队列无锁。既然如此,我想知道是否有什么简单的方法可以让消费者知道在生产者关闭时停止调用 pop 等待函数。棘手的部分是如果队列上仍有数据,您将不得不等待消费者完成提取数据时关闭。我相信我有一个解决方案,消费者可以自己关闭但对想法持开放态度。提前致谢。

#ifndef __DataQueue_h__
#define __DataQueue_h__

#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>

template <typename DataT>
class DataQueue
{
  public:

  DataQueue (): _shutdown(false), _waitTime(5), _itemAvailable() {}

  void push ( const DataT& data )       
  {
    std::unique_lock<std::mutex> lock(_mutex);
    queue.push(data);
    _itemAvailable.notify_one();
  }

// worked fine until I need to shutdown services... then some were blocked
  DataT popWait()
  {
    std::unique_lock<std::mutex> lock(_mutex);

    if(queue.empty())
    {
      _itemAvailable.wait(lock);
    }

    DataT temp(queue.front());
    queue.pop();

    return temp;
  }

  inline void shutdown()
  {
    _shutdown = true;
  }

  private:
  std::queue<DataT> queue;
  bool _shutdown;
  unsigned int _waitTime;
  std::mutex _mutex;
  std::condition_variable _itemAvailable;

};

#endif
4

3 回答 3

1

一种想法是唤醒调用shutdown1的所有消费者。在该popWait方法中,您可以检查从wait.

#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>

template <typename DataT>
class DataQueue
{
public:
    DataQueue (): _shutdown(false), _itemAvailable() {}

    void push ( const DataT& data )       
    {
        std::unique_lock<std::mutex> lock(_mutex);
        queue.push(data);
        _itemAvailable.notify_one();
    }

    Maybe<DataT> popWait()
    {
        std::unique_lock<std::mutex> lock(_mutex);

        while(queue.empty() && !_shutdown)
        {
          _itemAvailable.wait(lock);
        }

        Maybe<DataT> data;
        // leave pending data in the queue
        if (_shutdown) 
        {
            // consumers should stop polling when receiving an 'empty' value
            return data;
        }

        data.add(queue.front());
        queue.pop();   
        return data;
    }

    inline void shutdown()
    {
        _shutdown = true;
        _itemAvailable.notify_all();
    }

private:
    std::queue<DataT> queue;
    std::atomic<bool> _shutdown;
    std::mutex _mutex;
    std::condition_variable _itemAvailable;
};

popWait 的返回值

除了所有同步和信令的东西,你还必须重新考虑popWait. 如果您想实现一个通用shutdown()方法,即不将特殊的标记值填充到队列本身中,则popWait必须能够返回一个表示生产者已停止的“值”——可能是像Maybe<DataT>2这样的模板。我设想Maybe<DataT>要么返回DataT要么什么都没有,在这种情况下消费者将停止轮询。

template<typename DataT>
class Maybe 
{
   DataT _data;
   bool _empty;

pulic:
   Maybe() : _data(), _empty(true) {};

   void add(const DataT& raData)
   {
      _data=raData;
      _empty=false;
   }

   bool isEmpty() const
   {
      return _empty;
   }

   DataT get() const
   {
      return _data;
   }
}

这是一个相当原始的接口。您可以根据需要扩展它。

1 ComicSansMS向我指出,我应该声明_shutdown成员变量std::atmic<bool>以避免内存重新排序问题。感谢您的提醒。

2我刚刚偶然发现std::optional<T>(C++14 中的新功能),这基本上是我的想法。

于 2013-07-02T07:25:51.483 回答
1

您可以将“毒丸”对象推入队列。实际上,推送与您想要关闭的消费者数量一样多。这样,您还将摆脱要检查的关闭成员变量,将此检查卸载到接收线程,从而提高整体排队性能。

这是我为我的线程池实现所做的方法,它工作得很好。

于 2013-07-02T09:51:02.953 回答
0

您可以检查队列是否为popWait空或_shutDown为真。在后一种情况下,您通知调用者他不应再请求数据(抛出异常(在我看来不是最佳的),或重新设计方法签名)。然后你所要做的就是在你的方法notifyAll()中调用。shutdown()

于 2013-07-02T06:28:49.507 回答