2

我正在尝试编写一个具有以下组件的测试台 -

  1. 数据源 (data_src)
  2. 数据接收器 (data_sink)

数据源和数据接收器通过称为连接器的数据通道连接在一起。连接器基本上是一个线程安全队列。数据源线程将数据推送到连接器中,数据接收器线程将数据从连接器中弹出。要求是数据接收器线程永远不能向连接器写入超过队列深度数量的样本。连接器的队列部门在连接器的构造中被初始化(参见 Testbed )。显然,数据接收器永远不应该读取空队列。当我运行下面的代码时,我看到了数据的有序性(数据一致性),但是我的实现似乎违反了写入超过队列部门样本数的数据写入限制......也发生了非常奇怪的事情当数据写入线程首先被调度时。它将写入一个队列深度的样本数,然后当读取线程出现时,它会读取 0 个样本,但不会锁定并继续读取写入的数据……最终读取将停止,数据写入线程将不会被调度。 ....有人可以理顺我吗?线程编程并不像想象的那么直观 ;-( 所以这就是程序应该做的 -

  1. 写入线程 - 而队列大小 < 深度;写入数据。
  2. 读取线程 - 而队列大小!= 0;读取数据。

代码编译如下 - % more Makefile EXE="thread_safe_queue"

exe: g++ -g -o $(EXE) $(EXE).cpp -I /home/rmitra1/Eval/boost/include/ -L \
/home/rmitra1/Eval/boost/lib -lboost_thread -lpthread -lboost_system

-------------------------程序代码如下---------- ----------------------

#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <iostream>
#include <queue>
#include <iterator>
#include <algorithm>

using namespace boost;
using namespace std;


///////////////////////////////////////////////////////////
//A transaction is an electrical signal with a value(data)
//in discreet time(t_id)
///////////////////////////////////////////////////////////

typedef struct transaction{
  float data;
  //the t_id is the transaction id; this is used to 
  //synchronize the data being generated by the producer 
  //threads
  int t_id; 
}sample__;


/////////////////////////////////////////////////////////////
//Thread safe queue; from which the connector is derived;
//it guarantees data coherency...
/////////////////////////////////////////////////////////////

template<class T>
class thread_safe_queue{
public:
  typedef typename std::queue<T>::container_type container_type;
  typedef typename std::queue<T>::size_type size_type;
  typedef typename std::queue<T>::value_type value_type;

  thread_safe_queue(){ cout << "Depth :: " << depth << endl;}
  ~thread_safe_queue(){}

private:
  mutable boost::mutex mutex_;
  boost::condition_variable cond_;
  std::queue<T> queue_;

public:
  void push(const T& t){
    {


      boost::mutex::scoped_lock lock(mutex_);
      cout << "Write::Queue Size :: "<< thread_safe_queue<sample__>::size() << "Depth :: " << depth << endl;

      //wait for condition
      while(size() > depth){
        cout <<"Write Stall...."<<endl;
        cond_.wait(lock);
      }

      queue_.push(t);
      //cout << "Push @ " << t.t_id << " * " << t.data << endl; 
    }
    cond_.notify_one(); //notify other thread of addition to queue
  }

  void pop(T& t){


    boost::mutex::scoped_lock lock(mutex_);
    cout << "Read::Queue Size :: "<< thread_safe_queue<sample__>::size() << "Depth :: " << depth << endl;

    //wait for condition
    while(queue_.empty()){
      cout << "Read Stall...."<<endl;
      cond_.wait(lock);
    }

    //cout << "    Pop @ " << t.t_id << " * " << t.data << endl;
    t = queue_.front();
    queue_.pop();
  }

  size_type size() const {
    boost::mutex::scoped_lock lock(mutex_);
    return queue_.size();
  }

protected:
  int depth;



}; //class thread_safe_queue

//////////////////////////////////////////////////////////////
//The connector class models a physical unidirectional attachment
//////////////////////////////////////////////////////////////


class connector : public thread_safe_queue<sample__> {

public:
  connector(int depth) : sample_cnt(0){
    thread_safe_queue<sample__>::depth = depth;
    cout << "Depth :: " << depth << endl;
  }
  ~connector(){}



  //*****************************************************
  //write method
  //*****************************************************
  void write_data(sample__ data_sample)
  {
    //cout << "Write::Queue Size :: "<< thread_safe_queue<sample__>::size() << endl;
      thread_safe_queue<sample__>::push(data_sample);

  }
  //******************************************************
  //read method
  //******************************************************
  sample__ read_data(void)
  {
    sample__ sample;

    //cout << "Read::Queue Size :: "<< thread_safe_queue<sample__>::size() << endl;
    thread_safe_queue<sample__>::pop(sample);
    return(sample);


  }
private:
  int sample_cnt;
  boost::condition_variable cond_;
  mutable boost::mutex mutex_;


};


/////////////////////////////////////////////////////////////////
//Data Source
/////////////////////////////////////////////////////////////////

class data_src{
public:
  connector *this_foo;
  data_src(connector *foo) : this_foo(foo){}
  ~data_src(){}
  void write(connector *foo)
  {
    sample__ in;
    int ii=0;
    while(1){
      in.data = rand();
      in.t_id = ii;
      this_foo->write_data(in);
      ii++;
      //sleep(2);
    }

  }
};//class data_src

////////////////////////////////////////////////////////////////
//Data Sink
////////////////////////////////////////////////////////////////

class data_sink{
public:
  connector *this_foo;
  data_sink(connector *foo) : this_foo(foo){}
  ~data_sink(){}
  void read(connector *foo)
  {
    sample__ out;
    while(1){
      out=this_foo->read_data();    
    }    
  }
};


class run : public boost::thread
{

};

//////////////////////////////////////////////////////////////////
//Testbed.....
//////////////////////////////////////////////////////////////////


int main(int argc, char**argv)
{

  connector *foo = new connector(5);
  data_src  generator(foo);
  data_sink terminal(foo);

  boost::thread __write;
  boost::thread __read;
  __write = boost::thread(&data_src::write, &generator, foo);
  __read  = boost::thread(&data_sink::read, &terminal,  foo);
  __read.join();
  __write.join();

}
4

0 回答 0