2

我正在使用队列(C#)来存储必须发送到任何连接的客户端的数据。

我的锁定语句是私有只读的:

private readonly object completedATEQueueSynched = new object();

只有两种方法入队:

1) 由鼠标移动启动,由主窗体线程执行:

public void handleEddingToolMouseMove(MouseEventArgs e)
{
    AbstractTrafficElement de = new...
    sendElementToAllPlayers(de)
    lock (completedATEQueueSynched)
    {
       completedATEQueue.Enqueue(de);
    }
}

2)从一个按钮事件开始,也由 mainform-thread 执行(在这里没关系,但比抱歉更安全):

public void handleBLC(EventArgs e)
{
    AbstractTrafficElement de = new...
    sendElementToAllPlayers(de);
    lock (completedATEQueueSynched)
    {
         completedATEQueue.Enqueue(de);
    }
}

此方法由负责连接的特定客户端的线程调用。这里是:

private void sendSetData(TcpClient c)
{
    NetworkStream clientStream = c.GetStream();
    lock (completedATEQueueSynched)
    {
        foreach (AbstractTrafficElement ate in MainForm.completedATEQueue)
        {
            binaryF.Serialize(clientStream, ate);
        }
    }
}

如果客户端连接并且我同时移动鼠标,则会发生死锁。如果我只锁定迭代,则会抛出 InvalidOperation 执行,因为队列已更改。

我也尝试过同步队列包装器,但它不适用于迭代。(甚至与锁结合)有什么想法吗?我只是不明白我的错误

4

3 回答 3

3

您可以减少争用,可能足以使其可以接受:

private void sendSetData(TcpClient c)
{
    IEnumerable<AbstractTrafficElement> list;

    lock (completedATEQueueSynched)
    {
        list = MainForm.completedATEQueue.ToList();  // take a snapshot
    }

    NetworkStream clientStream = c.GetStream();
    foreach (AbstractTrafficElement ate in list)
    {
       binaryF.Serialize(clientStream, ate);
    }    
}

但当然,快照引入了它自己的一点时序逻辑。在任何特定时刻,“所有元素”到底是什么意思?

于 2012-08-01T21:56:14.293 回答
2

看起来像您想要的ConcurrentQueue

更新

是的,工作正常,TryDequeue 在 Interlocked.CompareExchange 和 SpinWait 中使用。Lock 不是一个好选择,因为太贵了 看看SpinLock 并且不要忘记并行编程的数据结构

她是来自 ConcurrentQueue 的队列,如您所见SpinWait,并且Interlocked.Increment已被使用。看起来不错

public void Enqueue(T item)
{
  SpinWait spinWait = new SpinWait();
  while (!this.m_tail.TryAppend(item, ref this.m_tail))
    spinWait.SpinOnce();
}

  internal void Grow(ref ConcurrentQueue<T>.Segment tail)
  {
    this.m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
    tail = this.m_next;
  }

  internal bool TryAppend(T value, ref ConcurrentQueue<T>.Segment tail)
  {
    if (this.m_high >= 31)
      return false;
    int index = 32;
    try
    {
    }
    finally
    {
      index = Interlocked.Increment(ref this.m_high);
      if (index <= 31)
      {
        this.m_array[index] = value;
        this.m_state[index] = 1;
      }
      if (index == 31)
        this.Grow(ref tail);
    }
    return index <= 31;
  }
于 2012-08-01T21:36:14.470 回答
1

如果您的入队率和出队率不是很高,那么 Henk Holterman 的方法很好。在这里,我认为您正在捕捉鼠标移动。如果您希望在队列中生成大量数据,则上述方法并不好。锁定成为网络代码和排队代码之间的争用。此锁的粒度是整个队列级别

在这种情况下,我会推荐 GSerjo 提到的 - ConcurrentQueue。我研究了这个队列的实现。它非常颗粒状。它在队列中的单个元素级别运行。当一个线程出队时,其他线程可以并行入队而不会停止。

于 2012-08-01T22:54:12.553 回答