30

我正在将(坦率地说很棒)BlockingCollection<T>类型用于高度多线程的高性能应用程序。

通过集合有很多吞吐量,并且在微观层面上它是高性能的。但是,对于每个“批次”,它总是通过标记取消令牌来结束。这会导致在任何等待Take调用时引发异常。这很好,但我会选择返回值或输出参数来发出信号,因为 a) 异常有明显的开销,b) 在调试时,我不想手动关闭特定的中断异常例外。

实现似乎很激烈,理论上我想我可以反汇编并重新创建我自己的不使用异常的版本,但也许有一种不太复杂的方法?

我可以null在集合中添加一个(或者如果不是,一个占位符)对象来表示进程应该结束,但是还需要一种很好地中止的方法,即唤醒等待线程并以某种方式告诉它们发生了什么事。

那么 - 替代集合类型?重新创建我自己的?有什么方法可以滥用这个?

(一些上下文:我选择了它,BlockingCollection<T>因为它比手动锁定 a 具有优势Queue。尽我所知,线程原语的使用非常棒,在我的情况下,这里和那里的几毫秒和最佳核心至关重要。 )

编辑:我刚刚为此开了一笔赏金。我不相信Anastasiosyal 的回答涵盖了我在评论中提出的问题。我知道这是一个棘手的问题。有人可以提供帮助吗?

4

5 回答 5

9

正如我猜你自己已经完成了,查看 BlockingCollection 的反射源,不幸的是,当 CancellationToken 被传递到 BlockingCollection 并取消时,你将得到 OperationCancelledException,如下图所示(有几个图片后的解决方法)

GetConsumingEnumerable调用TryTakeWithNoTimeValidationBlockingCollection 进而引发此异常。

在此处输入图像描述

解决方法 #1

一种可能的策略是,假设您对生产者和消费者拥有更多控制权,而不是将取消令牌传递给 BlockingCollection(这将引发此异常),您将取消令牌传递给生产者和消费者。

如果您的生产者没有生产并且您的消费者没有消费,那么您已经有效地取消了操作而没有引发此异常并通过在 BlockingCollection 中传递 CancellationToken.None。

BlockingCollection 为 BoundedCapacity 或 Empty 时取消的特殊情况

生产者阻塞:当达到 BlockingCollection 上的 BoundedCapacity 时,生产者线程将被阻塞。因此,当尝试取消并且 BlockingCollection 处于 BoundedCapacity 时(这意味着您的消费者没有被阻止但生产者被阻止,因为他们无法将任何其他项目添加到队列中),那么您将需要允许使用其他项目(一个对于每个生产者线程),这将解除对生产者的阻塞(因为它们在添加到blockingCollection时被阻塞)并反过来允许您的取消逻辑在生产者端启动。

消费者阻塞:当你的消费者因为队列为空而被阻塞时,你可以在阻塞集合中插入一个空的工作单元(每个消费者线程一个),以便解除消费者线程的阻塞并允许你的取消逻辑启动消费者方面。

当队列中有项目且未达到 BoundedCapacity 或 Empty 等限制时,不应阻塞生产者和消费者线程。

解决方法 #2

使用取消工作单元。

当您的应用程序需要取消时,您的生产者(可能只有 1 个生产者就足够了,而其他生产者只是取消生产)将产生一个取消工作单元(可能是您也提到的 null 或一些实现标记接口的类)。当消费者消费这个工作单元并检测到它实际上是一个取消工作单元时,他们的取消逻辑就会启动。要产生的取消工作单元的数量需要等于消费者线程的数量。

同样,当我们接近 BoundedCapacity 时需要小心,因为这可能表明一些生产者被阻止了。根据生产者/消费者的数量,您可以让消费者消费,直到所有生产者(但 1 个)都关闭。这确保了周围没有挥之不去的生产者。当只剩下 1 个生产者时,您的最后一个消费者可以关闭,生产者可以停止生产取消工作单元。

于 2012-01-26T11:27:27.443 回答
1

我刚才做的 BlockingQueue 怎么样?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

它应该没有任何例外。当前队列只是在 dispose 上关闭事件,这可能不是您想要的。您可能希望对 null 进行 enque 并等到所有项目都已处理完毕。除此之外,它应该适合您的需求。

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}
于 2012-06-16T10:52:29.073 回答
1

您可以通过在最后一项上设置标志来发出批次结束的信号(向其添加 IsLastItem bool 属性或包装它)。或者您可以将 null 作为最后一项发送(但不确定 null 是否正确通过了 blockingcollection)。

如果您可以消除对“批处理”概念的需求,您可以创建一个额外的线程来持续 Take() 并从您的阻塞集合中处理新数据,并且什么也不做。

于 2012-06-19T15:26:28.610 回答
0

基伦,

根据我的检查,我个人不知道 ProducerConsumer 模式的任何线程安全类型,它完全符合您的要求。我不认为这是有竞争力的解决方案,但建议您BlockingCollection<T>使用少量进行装饰,extension method这将使您可以自由地提供任何内置或自定义类型而不是 default CancellationToken

阶段1:

以下是使用底层TryAddWithNoTimeValidation方法添加到队列的默认方法列表。

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

现在您可以为您感兴趣的任何/所有方法提供扩展。

第 2 阶段:

您现在参考您的实现TryAddWithNoTimeValidation而不是默认值。

我可以给你一个替代版本,TryAddWithNoTimeValidation它可以安全地继续而不抛出OperationCancellation异常。

于 2012-06-20T13:23:17.603 回答
0

My suggestion is to implement this functionality by encapsulating an asynchronous queue, like the BufferBlock<T> class from the TPL Dataflow library. This class is a thread-safe container intended for producer-consumer scenarios, and supports backpressure (BoundedCapacity) just like the BlockingCollection<T> class. Being asynchronous means that the corresponding Add/Take methods (SendAsync/ReceiveAsync) return tasks. These tasks store the event of a cancellation as an internal state, that can be queried with the IsCanceled property, so throwing exceptions internally can be avoided. Propagating this state with exceptions can also be avoided, by waiting the tasks using a exception-suppressing continuation (ContinueWith). Here is an implementation:

/// <summary>
/// A thread-safe collection that provides blocking and bounding capabilities.
/// The cancellation is propagated as a false result, and not as an exception.
/// </summary>
public class CancellationFriendlyBlockingCollection<T>
{
    private readonly BufferBlock<T> _bufferBlock;

    public CancellationFriendlyBlockingCollection()
    {
        _bufferBlock = new BufferBlock<T>();
    }

    public CancellationFriendlyBlockingCollection(int boundedCapacity)
    {
        _bufferBlock = new BufferBlock<T>(new() { BoundedCapacity = boundedCapacity });
    }

    public bool TryAdd(T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) return false;
        if (_bufferBlock.Post(item)) return true;
        Task<bool> task = _bufferBlock.SendAsync(item, cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        return task.Result;
    }

    public bool TryTake(out T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) { item = default; return false; }
        if (_bufferBlock.TryReceive(out item)) return true;
        Task<T> task = _bufferBlock.ReceiveAsync(cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        item = task.Result; return true;
    }

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        while (TryTake(out var item, cancellationToken)) yield return item;
    }

    public void CompleteAdding() => _bufferBlock.Complete();
    public bool IsCompleted => _bufferBlock.Completion.IsCompleted;
    public int Count => _bufferBlock.Count;

    // Wait the task to complete without throwing exceptions
    private static void WaitNoThrow(Task task)
    {
        if (task.IsCompleted) return;
        task.ContinueWith(_ => { }, default,
            TaskContinuationOptions.ExecuteSynchronously |
            TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Wait();
        Debug.Assert(task.IsCompleted);
    }
}

Performance: The CancellationFriendlyBlockingCollection.TryTake method can be invoked with a canceled CancellationToken in a loop with a frequency of about 15,000,000 times per second in my PC (on a single thread). For comparison the frequency of the BlockingCollection<T>.Take under the same conditions is about 20,000 times per second.

You might be tempted to replace the BufferBlock<T> with a more modern asynchronous queue like the Channel<T>. In that case please make sure to read this question first, in order to be aware about a leaky behavior of this class, under specific conditions.

于 2021-11-21T18:31:01.977 回答