My suggestion is to implement this functionality by encapsulating an asynchronous queue, like the BufferBlock<T>
class from the TPL Dataflow library. This class is a thread-safe container intended for producer-consumer scenarios, and supports backpressure (BoundedCapacity
) just like the BlockingCollection<T>
class. Being asynchronous means that the corresponding Add
/Take
methods (SendAsync
/ReceiveAsync
) return tasks. These tasks store the event of a cancellation as an internal state, that can be queried with the IsCanceled
property, so throwing exceptions internally can be avoided. Propagating this state with exceptions can also be avoided, by waiting the tasks using a exception-suppressing continuation (ContinueWith
). Here is an implementation:
/// <summary>
/// A thread-safe collection that provides blocking and bounding capabilities.
/// The cancellation is propagated as a false result, and not as an exception.
/// </summary>
public class CancellationFriendlyBlockingCollection<T>
{
private readonly BufferBlock<T> _bufferBlock;
public CancellationFriendlyBlockingCollection()
{
_bufferBlock = new BufferBlock<T>();
}
public CancellationFriendlyBlockingCollection(int boundedCapacity)
{
_bufferBlock = new BufferBlock<T>(new() { BoundedCapacity = boundedCapacity });
}
public bool TryAdd(T item, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested) return false;
if (_bufferBlock.Post(item)) return true;
Task<bool> task = _bufferBlock.SendAsync(item, cancellationToken);
WaitNoThrow(task);
if (!task.IsCompletedSuccessfully) return false;
return task.Result;
}
public bool TryTake(out T item, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested) { item = default; return false; }
if (_bufferBlock.TryReceive(out item)) return true;
Task<T> task = _bufferBlock.ReceiveAsync(cancellationToken);
WaitNoThrow(task);
if (!task.IsCompletedSuccessfully) return false;
item = task.Result; return true;
}
public IEnumerable<T> GetConsumingEnumerable(
CancellationToken cancellationToken = default)
{
while (TryTake(out var item, cancellationToken)) yield return item;
}
public void CompleteAdding() => _bufferBlock.Complete();
public bool IsCompleted => _bufferBlock.Completion.IsCompleted;
public int Count => _bufferBlock.Count;
// Wait the task to complete without throwing exceptions
private static void WaitNoThrow(Task task)
{
if (task.IsCompleted) return;
task.ContinueWith(_ => { }, default,
TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Wait();
Debug.Assert(task.IsCompleted);
}
}
Performance: The CancellationFriendlyBlockingCollection.TryTake
method can be invoked with a canceled CancellationToken
in a loop with a frequency of about 15,000,000 times per second in my PC (on a single thread). For comparison the frequency of the BlockingCollection<T>.Take
under the same conditions is about 20,000 times per second.
You might be tempted to replace the BufferBlock<T>
with a more modern asynchronous queue like the Channel<T>
. In that case please make sure to read this question first, in order to be aware about a leaky behavior of this class, under specific conditions.