1

我正在IProducerConsumerCollection(T)从一个接口公开一个,该接口将定期添加项目以供另一个线程使用。

public interface IProducer<T>
{
    IProducerConsumerCollection<T> ProducerCollection { get; }
}

我试图将 aBlockingCollection(T)与现有集合一起使用,但似乎IProducerConsumerCollection(T)不支持直接添加到:

var queue = new ConcurrentQueue<string>();
var blockingCollection = new BlockingCollection<string>(queue);

var task1 = Task.Run(() => {
    Console.WriteLine("Dequeued " + blockingCollection.Take());
    Console.WriteLine("Dequeued " + blockingCollection.Take());
});

var task2 = Task.Run(() => {
    Console.WriteLine("Enqueueing Hello");
    queue.Enqueue("Hello");

    Console.WriteLine("Enqueueing World");
    queue.Enqueue("World");
});

Task.WaitAll(task1, task2);

这将无限期挂起,因为BlockingCollection(T)不会注意到新项目。

是否有与该BlockingCollection(T).Take方法类似的功能,或者是否有比以下更简单的方法:

static async Task<T> TakeAsync<T>(
    IProducerConsumerCollection<T> collection,
    CancellationToken token
)
{
    T result;

    while(!collection.TryTake(out result))
    {
        await Task.Yield();
        token.ThrowIfCancellationRequested();
    }

    return result;
}
4

1 回答 1

5

这将无限期挂起,因为 BlockingCollection(T) 不会注意到新项目。

确实 - 因为您直接添加到queue. 从文档中:

不要直接修改底层集合。使用BlockingCollection<T>方法添加或删除元素。BlockingCollection<T>如果直接更改基础集合,对象可能会损坏。

所以你的电话queue.Enqueue应该blockingCollection改用。

就您而言TaskAsync- 您可以使用(-1)TryTake的超时Infinite...但您可能也想阅读Stephen Cleary 的类似主题的博客文章...

于 2013-09-10T17:02:13.173 回答