0

我创建了一个用于多线程应用程序的通用消息队列。具体来说,单一生产者,多消费者。主要代码如下。

1)我想知道我是否应该将一个用 new 分配的 shared_ptr 按值传递给 enqueue 方法,还是让队列包装器自己分配内存并通过 const 引用传入一个 genericMsg 对象更好?

2) 我应该让我的 dequeue 方法返回一个 shared_ptr,让一个 shared_ptr 通过引用作为参数传入(当前策略),还是让它直接返回一个 genericMsg 对象?

3) 我需要在入队/出队时发出信号/等待,还是读/写锁就足够了?

4) 我什至需要使用 shared_ptrs 吗?还是这仅取决于我使用的实现?我喜欢一旦所有引用不再使用该对象,shared_ptrs 将释放内存。不过,如果推荐的话,我可以轻松地将其移植到常规指针。

5)我在这里存储一对,因为我想区分我正在处理的消息类型,否则不必执行any_cast。每个消息类型都有一个唯一的 ID,它引用一个特定的结构。有没有更好的方法来做到这一点?

通用消息类型:

template<typename Message_T>
class genericMsg
{ 
  public:
    genericMsg()
    {
       id = 0;
       size = 0;
    }

    genericMsg (unsigned int &_id, unsigned int &_size, Message_T &_data)
    {
       id = _id;
       size = _size;
       data = _data;
    }

    ~genericMsg()
    {}

    unisgned int   id;
    unsigned int   size;
    Message_T      data; //All structs stored here contain only POD types
};

入队方法:

  // ----------------------------------------------------------------
  // -- Thread safe function that adds a new genericMsg object to the
  // -- back of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  inline void enqueue(boost::shared_ptr< genericMsg<Message_T> > data)
  {
     WriteLock w_lock(myLock);
     this->qData.push_back(std::make_pair(data->id, data));
  }

对比:

  // ----------------------------------------------------------------
  // -- Thread safe function that adds a new genericMsg object to the
  // -- back of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  inline void enqueue(const genericMsg<Message_T> &data_in)
  {
     WriteLock w_lock(myLock);
     boost::shared_ptr< genericMsg<Message_T> > data = 
          new genericMsg<Message_T>(data_in.id, data_in.size, data_in.data);
     this->qData.push_back(std::make_pair(data_in.id, data));
  }

出队方法:

  // ----------------------------------------------------------------
  // -- Thread safe function that grabs a genericMsg object from the
  // -- front of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  void dequeue(boost::shared_ptr< genericMsg<Message_T> > &msg)
  {
     ReadLock r_lock(myLock);
     msg = boost::any_cast< boost::shared_ptr< genericMsg<Message_T> > >(qData.front().second);
     qData.pop_front();
  }

获取消息 ID:

  inline unsigned int getMessageID()
  {
     ReadLock r_lock(myLock);
     unsigned int tempID = qData.front().first;
     return tempID;
  }

数据类型:

 std::deque < std::pair< unsigned int, boost::any> > qData;

编辑:

我改进了我的设计。我现在有一个 genericMessage 基类,我直接从其子类化以派生唯一消息。

通用消息基类:

class genericMessage
{
   public:
      virtual ~genericMessage() {}
      unsigned int getID() {return id;}
      unsigned int getSize() {return size;}

   protected:
      unsigned int id;
      unsigned int size;   
};

制片人片段:

 boost::shared_ptr<genericMessage> tmp (new derived_msg1(MSG1_ID));
 theQueue.enqueue(tmp);

消费者片段:

 boost::shared_ptr<genericMessage> tmp = theQueue.dequeue();         
 if(tmp->getID() == MSG1_ID)
 {
    boost::shared_ptr<derived_msg1> tObj = boost::dynamic_pointer_cast<derived_msg1>(tmp);
    tObj->printData();
 }

新队列:

  std::deque< boost::shared_ptr<genericMessage> > qData;

新入队:

void mq_class::enqueue(const boost::shared_ptr<genericMessage> &data_in)
{
   boost::unique_lock<boost::mutex> lock(mut);
   this->qData.push_back(data_in);
   cond.notify_one();
}

新出队:

boost::shared_ptr<genericMessage> mq_class::dequeue()
{
   boost::shared_ptr<genericMessage> ptr;
   {
      boost::unique_lock<boost::mutex> lock(mut);
      while(qData.empty())
      {
         cond.wait(lock);
      }
      ptr = qData.front();
      qData.pop_front();
   }
   return ptr;
}

现在,我的问题是我是否正确地出队?还有另一种方法吗?在这种情况下,我应该传入一个 shared_ptr 作为参考来实现我想要的吗?

4

3 回答 3

1

编辑(我为第 1、2 和 4 部分添加了答案)。

1) 你应该有一个工厂方法来创建新的 genericMsgs 并返回一个std::unique_ptr. 绝对没有充分的理由通过 const 引用传入 genericMsg,然后让队列将其包装在一个智能指针中:一旦通过引用传递,您就失去了所有权的跟踪,因此如果您这样做,队列将拥有构造(通过复制)要包装的整个 genericMsg。

2)我想不出在什么情况下可以安全地引用 a shared_ptror unique_ptror auto_ptr。shared_ptrs 和 unique_ptrs 用于跟踪所有权,一旦你对它们(或它们的地址)进行了引用,你就不知道还有多少引用或指针期望 shared_ptr/unique_ptr 对象包含一个有效的裸指针。

unique_ptr 总是优于裸指针,并且在一次只有一段代码(有效)指向一个对象的情况下优于 shared_ptr。

3) 是的,你需要std::condition_variable在你的dequeue函数中使用 a 。qData在调用qData.front()或之前,您需要测试是否为空qData.pop_front()。如果qData为空,则需要等待条件变量。当enqueue插入一个项目时,它应该向条件变量发出信号以唤醒可能一直在等待的任何人。

您对读取器/写入器锁的使用是完全不正确的。不要使用读/写锁。使用std::mutex. 阅读器锁只能用于完全const. 您正在修改qDatadequeue因此读卡器锁定将导致那里的数据竞争。(读写器锁仅适用于当您有既是 const长时间持有锁的愚蠢代码时。您只是在从队列中插入或删除所需的时间段内保持锁,所以即使您读取器/写入器锁的const额外开销是否会造成净损失。)

可以在以下位置找到使用互斥锁和条件变量实现(有界)缓冲区的示例:这是在 C++ 中实现有界缓冲区的正确方法

4)unique_ptr总是首选裸指针,通常首选shared_ptr. (shared_ptr 可能更好的主要例外是类似图形的数据结构。)在像你这样的情况下,你正在读一些东西,用工厂创建一个新对象,将所有权移到队列中,然后将所有权移出从队列到消费者,听起来您应该使用 unique_ptr。

5)您正在重新发明标记的 unions。虚函数是专门添加到 c++ 中的,因此您不需要这样做。do_it()您应该从具有称为(或更好,operator()()或类似的东西)的虚函数的类中子类化您的消息。然后,不要标记每个结构,而是使每个结构成为消息类的子类。当您将每个结构(或 ptr 到结构)出列时,只需调用do_it()它即可。强静态类型,无强制转换。有关示例,请参阅涵盖许多共享变量的 C++ std 条件变量。

另外:如果您要坚持使用标记的联合:您不能单独调用来获取 id 和数据项。考虑:如果线程 A 调用获取 id,然后线程 B 调用获取 id,然后线程 B 检索数据项,那么当线程 A 调用检索数据项时会发生什么?它得到一个数据项,但不是它所期望的类型。您需要在同一关键部分下检索 id 和数据项。

于 2013-04-29T20:34:43.053 回答
1

首先,最好使用 3rd-party 并发容器而不是自己实现它们,除非它是用于教育目的。

您的消息看起来没有昂贵的构造函数/析构函数,因此您可以按值存储它们而忘记所有其他问题。使用移动语义(如果可用)进行优化。

如果您的分析器说“按价值”在您的特定情况下是个坏主意:

我想您的生产者创建消息,将它们放入您的队列并且对它们失去任何兴趣。在这种情况下,您不需要shared_ptr,因为您没有共享所有权。您可以使用unique_ptr甚至是原始指针。这是实现细节,最好将它们隐藏在队列中。

从性能的角度来看,最好实现无锁队列。“锁定与信号”完全取决于您的应用程序。例如,如果您使用线程池和某种调度程序,最好让您的客户端在队列满/空时做一些有用的事情。在更简单的情况下,读/写锁就可以了。

于 2013-04-29T20:35:29.227 回答
0

如果我想要线程安全,我通常使用 const 对象并仅在复制或创建构造函数时进行修改。这样,您就不需要使用任何锁定机制。在线程系统中,它通常比mutex在单个实例上使用 es 更有效。

在您的情况下,只deque需要锁定。

于 2013-04-29T20:22:27.193 回答