对于多线程应用程序,我想await
直到 aBlockingCollection
完成并为空(IsCompleted = true)。我实现了以下,这似乎工作。
由于它是多线程的,我什至不相信我自己的影子。这会是一个健壮的实现吗?
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
public Task WaitCompleted => completedManualResetEvent.Task;
private readonly TaskCompletionSource completedManualResetEvent = new();
public new void CompleteAdding()
{
base.CompleteAdding();
lock (completedManualResetEvent)
{
if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable()
{
foreach (var item in base.GetConsumingEnumerable())
yield return item;
lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException
{
if (!completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();
public new T Take() => throw new NotImplementedException();
public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}
用法:
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 1
});
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 2
});
await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy