1

我有多个线程生成项目并将它们粘贴在一个共同的位置ConcurrentQueue

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

我有另一个单一的消费者线程,但它需要在这个应用程序的上下文中工作的方式是,偶尔,它只需要抓住当前线程队列中的所有内容,将其从该队列中删除,一键完成。就像是:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到类似“同时从队列中获取所有对象”,甚至“同时与另一个队列交换内容”之类的东西。

如果我放弃并只用 aConcurrentQueue保护正常,我可以做到这一点,如下所示:Queuelock

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

但是,我喜欢它的便利性,ConcurrentQueue而且因为我刚刚学习 C#,所以我对 API 很好奇;所以我的问题是,有没有办法用其中一个并发集合来做到这一点?

是否有某种方法可以访问任何同步对象ConcurrentQueue使用并为我自己的目的将其锁定,以便一切都可以很好地协同工作?然后我可以锁定它,拿走所有东西,然后释放?

4

2 回答 2

3

这取决于你想做什么。根据源代码中的注释

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

这通过在内部调用ToList() 来工作,而 ToList()反过来又可以工作m_numSnapshotTakers自旋机制

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

如果您只需要快照,那么您很幸运。但是,似乎没有内置方法可以ConcurrentQueue以线程安全的方式从 a 中获取和删除所有项目。您将需要通过使用lock或类似方法烘焙您自己的同步。或者自己动手(从源头上看可能并不难)。

于 2020-02-10T21:32:20.853 回答
2

没有这样的方法,因为TakeEverything实际上应该做什么是模棱两可的:

  1. 逐项取件,直到队列为空,然后返回取件。
  2. 锁定对队列的整个访问,拍摄快照(循环拍摄所有项目)= 清除队列,解锁,返回快照。

考虑第一种情况并想象在您从队列中逐个删除项目时其他线程正在写入队列 -TakeEverything方法是否应该在结果中包含这些项目?

如果是,那么您可以将其写为:

public List<GeneratedItem> TakeEverything()
{
    var list = new List<GeneratedItem>();

    while (queuedItems.TryDequeue(out var item))
    {
        list.Add(item);
    }

    return list;
}

如果不是,那么我仍然会使用ConcurrentQueue因为普通的所有实例成员 - 方法和属性 -Queue都不是线程安全的)并为每个读/写访问实现自定义锁定,因此您确保在“获取所有内容”时没有添加项目从队列中。

于 2020-02-10T21:43:14.570 回答