1

我的基本问题是,如果队列为空,则需要立即处理队列中的项目,或者如果已处理项目,则将项目添加到队列中并离开。

我正在尝试一种使用 peek 来简化事情的技术,并且想知道可能会遇到什么问题。谢谢!

    void SequenceAction(Action action) {
       bool go = false;

       lock (_RaiseEventQueueLock) {
          _RaiseEventQueue.Enqueue(action);
          go = (_RaiseEventQueue.Count == 1); 
       }

       // 'go' can only be true if queue was empty when queue 
       //  was locked and item was enqueued.
       while (go) {
          #if naive_threadsafe_presumption 
          // Peek is threadsafe because in effect this loop owns
          //  the zeroeth item in the queue for as long as the queue 
          //  remains non-empty.
          (_RaiseEventQueue.Peek())();

          #else
          Action a;
          lock (_RaiseEventQueueLock) {
             a = _RaiseEventQueue.Peek();
          }
          a();
          #endif   

          // Lock the queue to see if any item was enqueued while
          //  the zeroeth item was being processed.
          // Note that the code processing an item can call this 
          //  function reentrantly, adding to its own todo list
          //  while insuring that that each item is processed 
          //  to completion.
          lock (_RaiseEventQueueLock) {
             _RaiseEventQueue.Dequeue();
             go = (_RaiseEventQueue.Count > 0); 
          }
       }
    }
4

2 回答 2

1

实际上,您Peek的不是线程安全的。将项目添加到队列可能会导致后备存储(最终是一个数组)被调整大小。我想队列是在循环缓冲区中实现的,带有用于插入和删除的头和尾索引。

所以想象一下如果队列中有 16 个项目会发生什么。Insert 在 8,Remove 在 9。队列已满。然后会发生这种情况:

  1. 线程 A 调用 Peek,检索删除索引 (9)。
  2. 线程 A 被换出。
  3. 线程 B 调用 Enqueue 并看到它必须增加队列。
  4. 线程 B 分配一个包含 32 项的新数组并从现有数组中复制数据。数据按顺序复制,从 Remove 开始并环绕。
  5. 线程 B 将 Remove 设置为 0 并将 Insert 设置为 16。
  6. 线程 A 获取下一个时间片并返回位置 9 处的项目。
  7. 你刚刚处理了一个乱序的事件,你最终会再次处理它。
  8. 更糟糕的是,您将删除位置 0 处的项目而不对其进行处理。

可能可以通过以下方式解决该问题:

Action nextAction;
lock (_RaiseEventQueueLock)
{
    nextAction = _RaiseEventQueue.Peek();
}
nextAction();

不过,我不会把我的职业生涯押在上面。我建议使用BlockingCollection和生产者/消费者设计。

可能的修复

在我看来,以下内容应该符合您的预期。

private readonly object _queueLock = new object();
private readonly object _processLock = new object();

void SequenceAction(Action action)
{
    lock (_queueLock)
    {
        _RaiseEventQueue.Enqueue(action);
    }
    if (Monitor.TryEnter(_processLock))
    {
        while (true)
        {
            Action a;
            lock (_queueLock)
            {
                if (_RaiseEventQueue.Count == 0) return;
                a = _RaiseEventQueue.Dequeue();
            }
            a();
        }
        Monitor.Exit(_processLock);
    }
}
于 2013-03-06T03:56:54.563 回答
1
    // If action already in progress, add new
    //  action to queue and return.
    // If no action in progress, begin processing
    //  the new action and continue processing
    //  actions added to the queue in the meantime.
    void SequenceAction(Action action) {
       lock (_SequenceActionQueueLock) {
          _SequenceActionQueue.Enqueue(action);
          if (_SequenceActionQueue.Count > 1) {
             return;
          }
       }
       // Would have returned if queue was not empty
       //  when queue was locked and item was enqueued.
       for (;;) {
          action();
          lock (_SequenceActionQueueLock) {
             _SequenceActionQueue.Dequeue();
             if (_SequenceActionQueue.Count == 0) {
                return;
             }
             action = _SequenceActionQueue.Peek();
          }
       }
    }
于 2013-03-07T01:19:10.360 回答