我的应用程序中有 6 个线程连续运行。场景是:
一个线程不断获取消息并插入消息队列。其他 4 个线程可以被认为是不断从队列中获取消息并处理它们的工作线程。另一个最终线程填充分析信息。
问题:
现在获取消息线程的睡眠持续时间是 100 毫秒。工作线程为 200 毫秒。当我运行这个应用程序时,消息获取线程正在控制并插入队列,从而增加了堆。工作线程没有机会处理消息并释放它们。最后导致内存不足。
如何管理这种情况,以便为消息获取线程和工作线程提供平等的机会。
提前致谢 :)
我的应用程序中有 6 个线程连续运行。场景是:
一个线程不断获取消息并插入消息队列。其他 4 个线程可以被认为是不断从队列中获取消息并处理它们的工作线程。另一个最终线程填充分析信息。
问题:
现在获取消息线程的睡眠持续时间是 100 毫秒。工作线程为 200 毫秒。当我运行这个应用程序时,消息获取线程正在控制并插入队列,从而增加了堆。工作线程没有机会处理消息并释放它们。最后导致内存不足。
如何管理这种情况,以便为消息获取线程和工作线程提供平等的机会。
提前致谢 :)
您需要向生产者线程添加背压。通常这将通过使用阻塞的消费者-生产者队列来完成。生产者将项目添加到队列中,消费者从队列中取出项目并处理它们。如果队列为空,消费者将阻塞,直到生产者向队列添加内容。如果队列已满,则生产者会阻塞,直到消费者从队列中获取项目。
我经常使用的一种流控制系统是在启动时创建大量消息对象,并且不再创建。*对象存储在线程安全的阻塞“池队列”中并循环,由生产者/s从池中弹出,在其他阻塞队列上排队到消费者/s,然后在“消费”时推回池队列.
这限制了内存使用,提供了流量控制(如果池为空,生产者/s 将阻塞它,直到消息从消费者返回),并消除持续的 new/delete/malloc/free。不需要更复杂和更慢的有界队列,所有队列只需大到足以容纳(已知)最大数量的消息。
使用“经典”阻塞队列不需要任何 Sleep() 调用。
你想使用一个有界队列,当它满时会阻塞试图入队的线程,直到有更多空间可用。
您可以使用tbb中的 concurrent_bounded_queue ,或者简单地使用初始化为最大队列大小的信号量,并在入队时递减并在出队时递增。boost::thread 本身不提供信号量,但您可以使用锁和条件变量来实现它。
你的问题有点模糊,所以我可以给你这些指导而不是代码:
C++ 11 中的线程库拥有实现消费者生产者应用程序所需的一切。但是,如果您无法升级编译器,您也可以使用 boost 线程库。