8

I am trying to make a producer/consumer thread situation more efficient by skipping expensive event operations if necessary with something like:

//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free

running = false


// dd item to queue – producer thread(s)

if(cas(running, false, true))
{
  // We effectively obtained a lock on signalling the event
  add_to_queue()
  signal_event()
}
else
{
  // Most of the time if things are busy we should not be signalling the event
  add_to_queue()

  if(cas(running, false, true))
    signal_event()
}

...

// Process queue, single consumer thread

reset_event()

while(1)
{
  wait_for_auto_reset_event() // Preferably IOCP

  for(int i = 0; i &lt SpinCount; ++i)
    process_queue()

  cas(running, true, false)

  if(queue_not_empty())
    if(cas(running, false, true))
      signal_event()
}

Obviously trying to get these things correct is a little tricky(!) so is the above pseudo code correct? A solution that signals the event more than is exactly needed is ok but not one that does so for every item.

4

4 回答 4

2

这属于“停止胡闹,重新开始工作”的子类别,称为“过早优化”。:-)

如果“昂贵”的事件操作占用了很大一部分时间,那么您的设计是错误的,而不是使用生产者/消费者,您应该使用关键部分/互斥锁并从调用线程中完成工作。

如果您真的担心,我建议您对您的应用程序进行概要分析。

更新:

正确答案:

制片人

ProducerAddToQueue(pQueue,pItem){

    EnterCriticalSection(pQueue->pCritSec)
        if(IsQueueEmpty(pQueue)){
            SignalEvent(pQueue->hEvent)
        }

        AddToQueue(pQueue, pItem)
    LeaveCriticalSection(pQueue->pCritSec)
}

消费者

nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.

ConsumerRun(pQueue)
{
    while(!ShouldQuit())
    {
        Item* pCurrentItem = NULL;
        EnterCriticalSection(pQueue-pCritSec);
            if(IsQueueEmpty(pQueue))
            {
                ResetEvent(pQueue->hEvent)
            }
            else
            {
                pCurrentItem = RemoveFromQueue(pQueue);
            }
        LeaveCriticalSection(pQueue->pCritSec);

        if(pCurrentItem){
            ProcessItem(pCurrentItem);
            pCurrentItem = NULL;
        }
        else
        {
            // Wait for items to be added.
            WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
        }

    }
}

笔记:

  • 该事件是手动重置事件。
  • 受临界区保护的操作很快。仅当队列转换为空状态/从空状态转换时,才会设置或重置该事件。它必须在关键部分内设置/重置以避免竞争条件。
  • 这意味着关键部分只保留很短的时间。所以争用将是罕见的。
  • 除非它们被争用,否则临界区不会阻塞。因此上下文切换将很少见。

假设:

  • 这是一个真正的问题,而不是家庭作业。
  • 生产者和消费者将大部分时间花在做其他事情上,即为队列准备好项目,在将它们从队列中移除后进行处理。
  • 如果他们大部分时间都在做实际的队列操作,那么你不应该使用队列。我希望这是显而易见的。
于 2011-03-03T17:37:22.933 回答
0

看了一堆案例,没看出问题。但这有点复杂。我想也许你会遇到 queue_not_empty / add_to_queue 比赛的问题。但看起来两条路径中的后主导 CAS 都涵盖了这种情况。

CAS 很贵(不如信号贵)。如果您希望跳过信号很常见,我会将 CAS 编码如下:

bool cas(variable, old_val, new_val) {
   if (variable != old_val) return false
   asm cmpxchg
}

像这样的无锁结构是Jinx(我从事的产品)非常擅长测试的东西。因此,您可能希望使用 eval 许可证来测试无锁队列和信号优化逻辑。


编辑:也许你可以简化这个逻辑。

running = false 

// add item to queue – producer thread(s) 
add_to_queue()
if (cas(running, false, true)) {
   signal_event()
}

// Process queue, single consumer thread 

reset_event() 

while(1) 
{ 
    wait_for_auto_reset_event() // Preferably IOCP 

   for(int i = 0; i &lt SpinCount; ++i) 
       process_queue() 

   cas(running, true, false)  // this could just be a memory barriered store of false

   if(queue_not_empty()) 
      if(cas(running, false, true)) 
         signal_event() 
} 

现在 cas/信号总是彼此相邻,它们可以移动到子程序中。

于 2010-10-05T21:54:21.160 回答
0

为什么不将 abool与事件相关联?用于cas将其设置为 true,如果cas成功则发出事件信号,因为事件必须已清除。然后服务员可以在等待之前清除标志

bool flag=false;

// producer
add_to_queue();
if(cas(flag,false,true))
{
    signal_event();
}

// consumer
while(true)
{
    while(queue_not_empty())
    {
        process_queue();
    }
    cas(flag,true,false); // clear the flag
    if(queue_is_empty())
        wait_for_auto_reset_event();
}

这样,您仅在队列中没有元素时才等待,并且您只需为每批项目发出一次事件信号。

于 2010-10-08T10:46:59.283 回答
0

我相信,你想在这个问题中实现类似的目标:
WinForms Multithreading: E​​xecute a GUI update only if the previous one has done. 它特定于 C# 和 Winforms,但该结构可能很适合您。

于 2010-12-02T11:42:58.343 回答