2

我正在尝试对一个系统进行建模,其中有多个线程产生数据,一个线程消耗数据。诀窍是我不想要一个专用线程来使用数据,因为所有线程都存在于一个池中。相反,我希望其中一个生产者在有工作时清空队列,如果另一个生产者已经在清除队列,则让出。

基本思想是有一个工作队列,以及围绕处理的锁。每个生产者将其有效负载推入队列,然后尝试进入锁。尝试是非阻塞的,并返回 true(已获得锁)或 false(锁由其他人持有)。

如果获得锁,则该线程将处理队列中的所有数据,直到它为空(包括其他生产者在处理期间引入的任何新有效负载)。处理完所有工作后,线程释放锁并退出。

以下是该算法的 C++ 代码:

void Process(ITask *task) {
     // queue is a thread safe implementation of a regular queue
     queue.push(task);

     // crit_sec is some handle to a critical section like object
     // try_scoped_lock uses RAII to attempt to acquire the lock in the constructor
     //                 if the lock was acquired, it will release the lock in the
     //                 destructor
     try_scoped_lock lock(crit_sec);

     // See if this thread won the lottery. Prize is doing all of the dishes
     if (!lock.Acquired())
        return;

     // This thread got the lock, so it needs to do the work
     ITask *currTask;
     while (queue.try_pop(currTask)) {
          ... execute task ...
     }
}

总的来说,这段代码运行良好,我从未亲眼目睹我将在下面描述的行为,但这种实现让我感到不安。理所当然地,在线程退出 while 循环和释放临界区之间引入了竞争条件。

整个算法依赖于这样的假设:如果锁被持有,那么一个线程正在为队列服务。

我本质上是在寻找对 2 个问题的启发:

  1. 我是否正确地说存在所描述的比赛条件(其他比赛的奖金)
  2. 是否有一种标准模式来实现这种高性能且不引入竞争条件的机制?
4

1 回答 1

2

是的,有一个竞争条件。

线程 A 添加一个任务,获取lock,自行处理,然后从queue. 它被拒绝。

此时线程 B 将任务添加到queue. 然后它尝试获取锁,但失败了,因为线程 A 拥有锁。线程 B 退出。

线程 A 然后退出,queue非空,没有人处理它上面的任务。

这将很难找到,因为那个窗口相对狭窄。为了让它更有可能找到,在while循环之后引入“睡眠 10 秒”。在调用代码中,插入一个任务,等待 5 秒,然后插入第二个任务。再过 10 秒后,检查两个插入任务是否都已完成,并且queue.

解决此问题的一种方法是更改try_pop​​为try_pop_or_unlock,并将您的传递lock给它。 try_pop_or_unlock然后自动检查是否为空queue,如果是则解锁lock并返回 false。

另一种方法是改进线程池。添加一个基于计数信号量的“消费”任务启动器。

semaphore_bool bTaskActive;
counting_semaphore counter;

when (counter || !bTaskActive)
  if (bTaskActive)
    return
  bTaskActive = true
  --counter
  launch_task( process_one_off_queue, when_done( [&]{ bTaskActive=false ) );

当计数信号量处于活动状态时,或者被完成的消费任务戳时,如果没有消费任务处于活动状态,它会启动消费任务。

但这只是我的想法。

于 2013-02-28T23:26:53.597 回答