我正在编写一个具有事件队列的应用程序。我的意图是以这样一种方式创建它,即多个线程可以写入并且一个线程可以从队列中读取,并将弹出元素的处理交给另一个线程,以便后续再次弹出不会被阻塞。我使用锁和条件变量从队列中推送和弹出项目:
void Publisher::popEvent(boost::shared_ptr<Event>& event) {
boost::mutex::scoped_lock lock(queueMutex);
while(eventQueue.empty())
{
queueConditionVariable.wait(lock);
}
event = eventQueue.front();
eventQueue.pop();
lock.unlock();
}
void Publisher::pushEvent(boost::shared_ptr<Event> event) {
boost::mutex::scoped_lock lock(queueMutex);
eventQueue.push(event);
lock.unlock();
queueConditionVariable.notify_one();
}
在 Publisher 类的构造函数中(仅创建一个实例),我正在启动一个线程,该线程将遍历一个循环,直到捕获 notify_one(),然后启动另一个线程来处理从队列中弹出的事件:
在构造函数中:
publishthreadGroup = boost::shared_ptr<boost::thread_group> (new boost::thread_group());
publishthreadGroup->create_thread(boost::bind(queueProcessor, this));
队列处理器方法:
void queueProcessor(Publisher* agent) {
while(true) {
boost::shared_ptr<Event> event;
agent->getEvent(event);
agent->publishthreadGroup->create_thread(boost::bind(dispatcher, agent, event));
}
}
在dispatcher方法中,完成相关处理,将处理后的信息通过thrift发布到服务器。在程序存在之前调用的另一个方法中,即在主线程中,我调用 join_all() 以便主线程等待线程完成。
在这个实现中,在为调度程序创建线程之后,在上面的while循环中,我遇到了死锁/挂起。运行代码似乎卡住了。这个实现有什么问题?有没有更清洁、更好的方法来做我想做的事情?(多个生产者和一个消费者线程遍历队列并将元素的处理移交给不同的线程)
谢谢!