4

我正在使用concurrent_bounded_queueIntel TBB 4.1 Update 3 进行生产者和消费者线程之间的通信:

队列类有一个调用方法,该方法abort会抛出tbb::user_abort阻塞在队列实例上poppush队列实例上的所有线程。两个线程之间的通信可能如下所示:

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
q.pop      |  no more data!
(wait)     |  q.abort
quit       |  quit

不幸的是,即使在这个简单的示例中,我也无法使用它来可靠地关闭队列,因为如果某些消费者pop在调用之前没有完成对先前 ped 数据的处理abort,他们将完成迭代并返回阻塞pop

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
process    |  no more data!
process    |  q.abort
process    |  quit
process    |
q.pop      |
(wait)     |
(wait)     |
(wait)     |
(so lonely)|

现在我正在使用一个中度恶心的 hack,它产生另一个非分离线程(它加入消费者池线程)并等待它完成,同时abort不时为后来者发送更多 s:

bool areConsumerThreadsJoinedThankYou = false;
std::thread joiner(Joiner(consumerPool, &areConsumerThreadsJoinedThankYou));

while (!areConsumerThreadsJoinedThankYou) {
    rawQueue.abort();
    MAGIC_MSLEEP(100);
}

class Joiner的实现差不多

void Joiner::operator()(void)
{
    for (auto it = this->m_threadPool.begin();
         it < this->m_threadPool.end();
         it++)
        (*it)->join();
    this->m_done = true;
    *(this->m_flag) = true;
}

这当然是非常丑陋的。有没有更根本的解决方案?

4

1 回答 1

6

创建一个指定的“EndOfData”项。如果您知道您有 K 个消费者,则让生产者在完成推送数据项后推送 K“EndOfData”项。让每个消费者在弹出“EndOfData”项目后退出。

如果事先不知道 K,让生产者推送一个“EndOfData”项。然后让每个弹出“EndOfData”项目的消费者在离开之前推送另一个“EndOfData”项目。在所有消费者完成后,将剩下一个“EndOfData”项,该项将在队列被销毁时被销毁。

于 2013-06-07T14:39:15.900 回答