6

我的任务是修改一个同步 C 程序,以便它可以并行运行。目标是让它尽可能便携,因为它是许多人使用的开源程序。因此,我认为最好将程序包装在 C++ 层中,这样我就可以利用可移植的 boost 库。我已经这样做了,一切似乎都按预期工作。

我遇到的问题是决定在线程之间传递消息的最佳方法是什么。幸运的是,该程序的架构是多生产者和单一消费者的架构。更好的是,消息的顺序并不重要。我已经读过单生产者/单消费者 (SPSC) 队列将从这种架构中受益。有多线程编程经验的有什么建议吗?我对这些东西很陌生。此外,任何使用 boost 来实现 SPSC 的代码示例都将不胜感激。

4

3 回答 3

8

下面是我用于协作多任务/多线程库 (MACE) http://bytemaster.github.com/mace/的技术。它具有无锁的好处,除非队列为空。

struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }
于 2012-08-04T02:14:27.233 回答
2

网上有很多生产者-消费者队列的例子,对多个生产者/消费者来说是安全的。@bytemaster 发布了一个在每条消息中使用链接来消除队列类本身的存储的方法——这是一种很好的方法,我自己在嵌入式作业中使用它。

在队列类必须提供存储的地方,我通常使用大小为 N 的“池队列”,在启动时加载 N 个 *message 类实例。需要通信的线程必须从池中弹出一个*消息,加载它并将其排队。当最终“用完”时,*消息被推回池中。这限制了消息的数量,因此所有队列只需要长度为 N - 无需调整大小,无需 new(),无需 delete(),易于泄漏检测。

于 2012-08-04T07:51:49.413 回答
1

如果只有一个消费者但有多个生产者,那么我将使用一个数组或一些类似数组的数据结构,访问时间为 O(1),其中每个数组槽代表一个生产者消费者队列。单生产者-消费者队列的最大优势在于,您可以在没有任何显式同步机制的情况下使其无锁,从而使其成为多线程环境中非常快速的数据结构。有关单生产者-消费者队列的基本实现,请参见我的答案。

于 2012-08-04T02:09:10.003 回答