1

我的应用程序中有 6 个线程连续运行。场景是:

一个线程不断获取消息并插入消息队列。其他 4 个线程可以被认为是不断从队列中获取消息并处理它们的工作线程。另一个最终线程填充分析信息。

问题:

现在获取消息线程的睡眠持续时间是 100 毫秒。工作线程为 200 毫秒。当我运行这个应用程序时,消息获取线程正在控制并插入队列,从而增加了堆。工作线程没有机会处理消息并释放它们。最后导致内存不足。

如何管理这种情况,以便为消息获取线程和工作线程提供平等的机会。

提前致谢 :)

4

4 回答 4

4

您需要向生产者线程添加背压。通常这将通过使用阻塞的消费者-生产者队列来完成。生产者将项目添加到队列中,消费者从队列中取出项目并处理它们。如果队列为空,消费者将阻塞,直到生产者向队列添加内容。如果队列已满,则生产者会阻塞,直到消费者从队列中获取项目。

于 2012-12-19T11:44:07.607 回答
4

我经常使用的一种流控制系统是在启动时创建大量消息对象,并且不再创建。*对象存储在线程安全的阻塞“池队列”中并循环,由生产者/s从池中弹出,在其他阻塞队列上排队到消费者/s,然后在“消费”时推回池队列.

这限制了内存使用,提供了流量控制(如果池为空,生产者/s 将阻塞它,直到消息从消费者返回),并消除持续的 new/delete/malloc/free。不需要更复杂和更慢的有界队列,所有队列只需大到足以容纳(已知)最大数量的消息。

使用“经典”阻塞队列不需要任何 Sleep() 调用。

于 2012-12-19T12:14:03.007 回答
1

你想使用一个有界队列,当它满时会阻塞试图入队的线程,直到有更多空间可用。

您可以使用tbb中的 concurrent_bounded_queue ,或者简单地使用初始化为最大队列大小的信号量,并在入队时递减并在出队时递增。boost::thread 本身不提供信号量,但您可以使用锁和条件变量来实现它。

于 2012-12-19T11:42:49.867 回答
1

你的问题有点模糊,所以我可以给你这些指导而不是代码:

  1. 使用 Mutex 保护相互数据。在多线程消费者生产者问题中,相互数据(程序中的消息)通常存在竞争条件。一个线程试图在互内存位置上写入,而另一个线程试图从同一位置读取。阅读器读取的消息可能已损坏,因为作者在阅读过程中对其进行了覆盖。您可以使用互斥锁锁定共同内存位置。每个线程都应该获取此锁,以便能够读取或修改相互数据。这样,消费者进程将绝对确定数据没有被修改。但是您应该注意,获取此锁可能会阻止生产者线程,因此您应该尽快释放锁。
  2. 使用条件变量通知消费者线程。如果您不使用通知机制,则所有消费者线程都应主动检查将耗尽系统资源的数据生产。消费者线程应该很容易进入睡眠状态,因为他们知道生产者线程会在消息准备好时通知他们。

C++ 11 中的线程库拥有实现消费者生产者应用程序所需的一切。但是,如果您无法升级编译器,您也可以使用 boost 线程库。

于 2012-12-19T13:23:42.703 回答