4

我已经使用 Boost 线程和条件实现了一个基本的线程生产者-消费者(线程 1 = 生产者,线程 2 = 消费者)。我经常无限期地陷入等待() 。我真的看不出这里有什么问题。下面是一些伪代码:

// main class
class Main {
public:
  void AddToQueue(...someData...)
  {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push_back(new QueueItem(...someData...));
    m_cond.notify_one(); 
  }

  void RemoveQueuedItem(...someCond...)
  {
    // i'm wondering if this could cause the trouble?
    boost::mutex::scoped_lock lock(m_mutex);
    // erase a item matching condition (some code not shown,
    // but should be fairly self-explanatory -- IsMatch()
    // simply looks at a flag of QueueItem
    m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(),
      boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end());
  }

  friend void WorkerThread(Main* m);
private:      
  boost::ptr_deque<QueueItem> m_queue;
  boost::mutex m_mutex;
  boost::condition m_cond;
};

// worker thread
void WorkerThread(Main* m)
{
  typedef boost::ptr_deque<QueueItem>::auto_type RelType;
  RelType queueItem;

  while(!shutDown) {
    { // begin mutex scope
      boost::mutex::scoped_lock lock(m->m_mutex);
      while(m->m_queue.empty()) {
        m->m_cond.wait(lock); // <- stuck here forever quite often!
      }
      queueItem = m->m_queue->pop_front(); // pop & take ptr ownership
    } // end mutex scope

    // ... do stuff with queueItem
    // ...
    // ... queueItem is deleted when it leaves scope & we loop around
  }
}

一些附加信息:

  • 使用 Boost v1.44
  • Linux 和 Android 中出现问题;我还不确定它是否发生在 Windows 中

有任何想法吗?

更新:我相信我已经隔离了这个问题。一旦确认,我会进一步更新,希望是明天。

更新 2:事实证明上述代码没有问题。我依赖于 AddToQueue() 的底层 API - 在工作线程中处理数据并将其返回给 API 时,它有一个循环错误,它会再次调用 AddToQueue() ......现在已修复;-)

4

2 回答 2

2

I did something similar recently even though mine uses the STL queue. See if you can pick out from my implementation. As wilx says, you need to wait on the condition. My implementation has maximum limit on the elements in the queue and I use that to wait for the mutex/guard to be freed.

I originally did this on Windows with ability to use Mutex or Critical sections in mind hence the template parameter which you can remove and use boost::mutex directly if it simplifies it for you.

#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>

template <typename T> class Queue :  private boost::noncopyable
{
public:
  // constructor binds the condition object to the Q mutex
  Queue(T & mutex, size_t max_size) :  m_max_size(max_size), m_mutex(mutex){}

  // writes messages to end of Q 
  void put(const Message & msg)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // while Q is full, sleep waiting until something is taken off of it
    while (m_queue.size() == m_max_size)
    {
      cond.wait(guard);
    }

    // ok, room on the queue. 
    // Add the message to the queue
    m_queue.push(msg);

    // Indicate so data can be ready from Q
    cond.notify_one();
  }

  // Read message from front of Q. Message is removed from the Q
  Message get(void)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // If Q is empty, sleep waiting for something to be put onto it
    while (m_queue.empty())
    {
      cond.wait(guard);
    }

    // Q not empty anymore, read the value
    Message msg = m_queue.front();

    // Remove it from the queue
    m_queue.pop();

    // Signal so more data can be added to Q
    cond.notify_one();

    return msg;
  }

  size_t max_size(void) const
  {
    return m_max_size;
  }


private:
  const size_t m_max_size;
  T & m_mutex;
  std::queue<Message> m_queue;
  boost::condition_variable_any cond;
};

This way, you can share the queue across the producer/consumer. Example usage

boost::mutex mutex;

Queue<boost::mutex> q(mutex, 100);

boost::thread_group threads;

threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));

threads.join_all();

With Producer/Consumer defined as below

template <typename T> class Producer
{
public:
   // Queue passed in
   explicit Producer(Queue<T> &q) :  m_queue(q) {}

   void operator()()
   {
   }
}
于 2010-10-14T08:41:58.963 回答
0
m->m_cond.wait(); // <- stuck here forever quite often!

应该:

m->m_cond.wait( lock ); 

您已经完全锁定了您的课程,因为您仍然获得了互斥锁,但您正在等待。其他所有方法都希望获取相同的互斥锁并等待永远不会释放互斥锁的工作人员。

于 2010-10-14T08:54:13.047 回答