0

我正在玩boost库和C++。我想创建一个包含生产者、消费者和堆栈的多线程程序。生产者填充堆栈,消费者从堆栈中删除项目(int)。一切正常(pop,push,mutex)但是当我在线程中调用pop/push时,我没有任何效果

我做了这个简单的代码:

#include "stdafx.h"
#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp> 
#include <boost/signals2/mutex.hpp>
#include <ctime>

using namespace std;

/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private : 
stack<int> m_Stack;
boost::signals2::mutex m;

public : 
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){

}
void push(int x){
    m.lock();
    m_Stack.push(x);
    m.unlock();
}

void pop(){
    m.lock();
    //return m_Stack.top();
    if(!m_Stack.empty())
        m_Stack.pop();
    m.unlock(); 
}
int size(){
    return m_Stack.size();
}
bool isEmpty(){
    return m_Stack.empty();
}
int top(){
    return m_Stack.top();
}
};

/*
*The producer is the class that fills the stack. It encapsulate the thread object 
*/

class Producer{
public:
Producer(int number ){
    //create thread here but don't start here
m_Number=number;


}
void fillStack (ProtectedStack& s ) {
    int object = 3; //random value
    s.push(object);
    //cout<<"push object\n";
}

void produce (ProtectedStack & s){
    //call fill within a thread 
    m_Thread = boost::thread(&Producer::fillStack,this, s);  
}

 private :
int m_Number;
boost::thread m_Thread;

};


/* The consumer will consume the products produced by the producer */ 

class Consumer {
private : 
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
    m_Number = n;
}

void remove(ProtectedStack &s ) {

     if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
        //cout<<"stack is empty\n";
        boost::posix_time::seconds workTime(1); 
        boost::this_thread::sleep(workTime);
     }
     else{
        s.pop(); //pop it
        //cout<<"pop object\n";

     }

}

void consume (ProtectedStack & s){
    //call remove within a thread 
    m_Thread = boost::thread(&Consumer::remove, this, s);  
}

};


int main(int argc, char* argv[])  
{  



ProtectedStack s;


    Producer p(0);
    p.produce(s);

    Producer p2(1);
    p2.produce(s);

    cout<<"size after production "<<s.size()<<endl;
    Consumer c(0);
    c.consume(s);
    Consumer c2(1);
    c2.consume(s);
    cout<<"size after consumption  "<<s.size()<<endl;

getchar();
return 0;  
}  

在我在 VC++ 2010 / win7 中运行它之后,我得到:0 0

你能帮我理解为什么当我从 main 调用 fillStack 函数时我得到了效果,但是当我从线程调用它时没有任何反应?谢谢

4

3 回答 3

5

正如其他人所指出的,您的示例代码存在几个同步问题:

  • 对 ProtectedStack 的某些成员的调用缺少锁。
  • 主线程可以退出而不允许工作线程加入。
  • 生产者和消费者不会像您期望的那样循环。生产者应该总是(当他们可以)生产,消费者应该在新元素被推入堆栈时继续消费。
  • 主线程上的 cout 很可能在生产者或消费者有机会工作之前执行。

我建议考虑使用条件变量在生产者和消费者之间进行同步。在此处查看生产者/消费者示例:http: //en.cppreference.com/w/cpp/thread/condition_variable 这是 C++11 标准库中的一个相当新的功能,并且从 VS2012 开始支持。在 VS2012 之前,您需要 boost 或使用 Win32 调用。

使用条件变量来解决生产者/消费者问题很好,因为它几乎强制使用互斥锁来锁定共享数据,并且它提供了一种信号机制让消费者知道某些东西已经准备好被消费,所以他们没有那么自旋(这始终是消费者的响应能力和轮询队列的 CPU 使用率之间的权衡)。它本身也是原子的,可以防止线程丢失信号的可能性,如这里解释的那样:https ://en.wikipedia.org/wiki/Sleeping_barber_problem

简要介绍条件变量如何处理此问题...

  • 生产者在不拥有互斥锁的情况下在其线程上执行所有耗时的活动。
  • 生产者锁定互斥体,将它产生的项目添加到全局数据结构(可能是某种队列),释放互斥体并发出信号通知单个消费者去 - 按那个顺序。
  • 等待条件变量的消费者自动重新获取互斥锁,将项目从队列中移除并对其进行一些处理。在此期间,生产者已经在生产一个新项目,但必须等到消费者完成后才能将项目排队。

这将对您的代码产生以下影响:

  • 不再需要 ProtectedStack,一个普通的堆栈/队列数据结构就可以了。
  • 如果您使用的是足够新的编译器,则无需提升 - 删除构建依赖项总是一件好事。

我觉得线程对您来说相当新,所以我只能提供建议,看看其他人如何解决同步问题,因为很难让您全神贯注。对具有多线程和共享数据的环境中发生的事情感到困惑通常会导致诸如死锁之类的问题。

于 2012-10-23T19:30:16.000 回答
2

您的代码的主要问题是您的线程不同步。请记住,默认情况下,线程执行不是有序的,也不是有序的,因此消费者线程实际上可以(并且在您的特定情况下)在任何生产者线程产生任何数据之前完成。

为了确保消费者在生产者完成工作后运行,您需要thread::join()在生产者线程上使用函数,它将停止主线程执行,直到生产者退出:

// Start producers
...

p.m_Thread.join();  // Wait p to complete
p2.m_Thread.join(); // Wait p2 to complete

// Start consumers
...

这可以解决问题,但这可能对典型的生产者-消费者用例不利。

要实现更有用的案例,您需要修复消费者功能。您的消费者函数实际上不会等待生成的数据,如果堆栈为空,它将退出,如果尚未生成数据,则永远不会消耗任何数据。

应该是这样的:

void remove(ProtectedStack &s)
{
   // Place your actual exit condition here,
   // e.g. count of consumed elements or some event
   // raised by producers meaning no more data available etc.
   // For testing/educational purpose it can be just while(true)
   while(!_some_exit_condition_)
   {
      if(s.isEmpty())
      {
          // Second sleeping is too big, use milliseconds instead
          boost::posix_time::milliseconds workTime(1); 
          boost::this_thread::sleep(workTime);               
      }               
      else
      {
         s.pop();
      }
   }
} 

另一个问题是错误的thread构造函数用法:

m_Thread = boost::thread(&Producer::fillStack, this, s);  

引用自Boost.Thread 文档

带参数的线程构造函数

template <class F,class A1,class A2,...> thread(F f,A1 a1,A2 a2,...);

前提条件: F并且每一个都An必须是可复制的或可移动的。

效果:好像thread(boost::bind(f,a1,a2,...))。因此,f 和每个 an 都被复制到内部存储中,以供新线程访问

这意味着您的每个线程都会收到自己的副本,s并且所有修改都不会应用于s本地线程副本。当您按值将对象传递给函数参数时,情况也是如此。您需要s通过引用传递对象 - 使用boost::ref

void produce(ProtectedStack& s)
{
   m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
}

void consume(ProtectedStack& s)
{
   m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
}  

另一个问题是关于您的互斥锁的使用。这不是最好的。

  1. 为什么使用 Signals2 库中的互斥锁?只需boost::mutex从 Boost.Thread 使用并删除对 Signals2 库的不需要的依赖。

  2. 使用 RAII 包装器boost::lock_guard而不是直接lock/unlock调用。

  3. 正如其他人所提到的,您应该锁定所有成员ProtectedStack

样本:

boost::mutex m;

void push(int x)
{ 
   boost::lock_guard<boost::mutex> lock(m);
   m_Stack.push(x);
} 

void pop()
{
   boost::lock_guard<boost::mutex> lock(m);
   if(!m_Stack.empty()) m_Stack.pop();
}              

int size()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.size();
}

bool isEmpty()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.empty();
}

int top()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.top();
}
于 2012-10-23T15:49:27.670 回答
1

在尝试使用之前,您没有检查生产线程是否已执行。你也没有锁定大小/空/顶部......如果容器正在更新,那是不安全的。

于 2012-10-23T13:48:25.953 回答