0

对于多线程应用程序,我想await直到 aBlockingCollection完成并为空(IsCompleted = true)。我实现了以下,这似乎工作。

由于它是多线程的,我什至不相信我自己的影子。这会是一个健壮的实现吗?

public class BlockingCollectionEx<T> : BlockingCollection<T>
{
    public Task WaitCompleted => completedManualResetEvent.Task;
    private readonly TaskCompletionSource completedManualResetEvent = new();

    public new void CompleteAdding()
    {
        base.CompleteAdding();

        lock (completedManualResetEvent)
        {
            if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }

    public new IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var item in base.GetConsumingEnumerable())
            yield return item;

        lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException 
        {
            if (!completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }
    public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new T Take() => throw new NotImplementedException();
    public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}

用法:

var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();

Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 1
});
Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 2
});

await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
4

1 回答 1

1

您的实现对于一般用途来说并不健壮,但对于遵守以下合同的应用程序来说可能已经足够了:

该集合必须仅由一个消费者使用,该消费者仅使用该GetConsumingEnumerable方法来使用它。

  1. 如果没有消费者,集合为空,CompleteAdding调用方法,WaitCompleted任务永远不会完成。
  2. 如果有两个或更多消费者,则枚举将失败,InvalidOperationException除了一个消费者之外的所有消费者。
  3. 如果只有一个消费者,但使用TakeorTryTake方法消费了集合,则WaitCompleted任务永远不会完成。

在不知道您的具体用例的情况下,我不能说您是否有正当理由请求此功能。但总的来说,等待 aBlockingCollection<T>变为空并完成的确切时刻通常并不重要。重要的是完成所有消费项目的处理的确切时刻,这发生收集完成之后。


注意:此答案针对此问题的修订版 1

于 2021-06-23T15:46:33.273 回答