2

这是使用VS 2010并发队列的典型生产者/消费者模式,问题是当我运行程序时,内存消耗超过1GB然后程序崩溃,有人可以指出这段代码中的问题吗?

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 

#include <boost\shared_ptr.hpp>
#include <boost\thread.hpp>
#include <concurrent_queue.h>



void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}

class CQueue
{
    Concurrency::concurrent_queue<int>  Q;

    boost::mutex                m;
    boost::condition_variable   cv;

public:

    CQueue():QValue(-1)
    {
    }

    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(m);
            cv.wait(lk);
        }

        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            return true;
        }
        return false;
    }

    void QWrite(int i)
    {
        Q.push(i);
        cv.notify_one();
    }

    int QValue;
};

CQueue myqueue;

void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);
    }
}


void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}
void main ()
{

    boost::thread w(write);
    boost::thread r(read);

    w.join();
    r.join();

}
4

2 回答 2

1

代码会丢失来自您的条件变量的通知,因此您的消费者线程睡眠时间过长,因此消耗的速度不够快。

想象一下可以想象的线程序列:

    Producer                       Consumer
--+-----------------------------+-------------------------------------------------------
1 |                             |  while(Q.empty())
2 |   Q.push(i);                |  boost::unique_lock<boost::mutex> lk(consumerMutex);
3 |   consumerCV.notify_one();  |
4 |                             |  consumerCV.wait(lk); // notification from 3 gets lost

要修复互斥锁,必须在生产者之前发出条件信号时consumerCV.notify_one()和在消费者之前检查队列状态时保持互斥锁Q.empty()

您可以通过注释掉所有互斥锁和条件变量调用并将使用者更改为忙等待来轻松检查while(Q.empty()) /* busy-wait */;

如果concurrent_queue不提供等待项目可用的功能,则最好使用包裹在互斥锁中的非线程安全容器。因为它仍然需要一个互斥锁和一个条件变量来正确通知使用无锁或无等待容器获得的好处会丢失。

此外,由于生产者只是++i为了生产,而消费者通过打印每个值来做更多的事情,消费者不可能跟上生产者的步伐,从而导致队列的增加和最终的内存耗尽。

于 2013-07-30T19:01:44.937 回答
1

我在一个简单的双核上使用 VS'13 和 Boost 1.52 构建并测试了您的代码。

正如已经说过的,由于您的生产者-消费者设计没有定义阈值以在库存(并发队列)达到给定水平时阻止生产者,生产者在队列中推送了太多数据,因此内存增加,windows开始交换,冻结,如果超过最大提交大小,进程可能会崩溃,等等....

请注意,提交大小限制取决于几个因素,编译器,编译器选项,程序运行的操作系统,...

因此,在下面我添加了一种方法来阻止生产者,如果队列大小达到阈值,并且如果队列大小低于阈值,则消费者唤醒生产者。

通过这些更改,我们添加了一些同步,这可能会限制并行性,但使用中的内存是受控的。

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 

#include "..\..\..\boost\boost\shared_ptr.hpp"
#include "..\..\..\boost\boost\thread.hpp"

#include <concurrent_queue.h>

#define STOCK_THRESHOLD 1000

void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}

class CQueue
{
    Concurrency::concurrent_queue<int>  Q;

    boost::mutex                consumerMutex;
    boost::condition_variable   consumerCV;

    boost::mutex                producerMutex;
    boost::condition_variable   producerCV;

public:

    CQueue():QValue(-1)
    {
    }

    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(consumerMutex);
            consumerCV.wait(lk);
        }

        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            if(Q.unsafe_size() <= STOCK_THRESHOLD)
            {
                producerCV.notify_one();
            }
            return true;
        }
        return false;
    }

    void QWrite(int i)
    {
        while(Q.unsafe_size() > STOCK_THRESHOLD){
            boost::unique_lock<boost::mutex> lk(producerMutex);
            producerCV.wait_for(lk, boost::chrono::milliseconds(10));
        }
        Q.push(i);
        consumerCV.notify_one();
    }

    int QValue;
};

CQueue myqueue;

void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);

    }
}


void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}

void main ()
{

    boost::thread w(write);
    boost::thread r(read);

    w.join();
    r.join();

}
于 2013-07-30T18:35:47.407 回答