5

有没有办法从阻塞集合中批量处理项目集合。例如

我有一个消息总线发布者调用blockingCollection.Add()

还有一个像这样创建的消费线程:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

但是,我只希望控制台在阻塞集合上有 10 个项目后写入,而 GetConsumingEnumerable() 总是在添加每个项目后触发。我可以为此编写自己的队列,但如果可能的话我想使用阻塞集合?

4

2 回答 2

4

一个快速的解决方案是这样的

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

那么你可以像这样使用它

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

您可以轻松扩展它以涵盖线程安全等

于 2014-02-11T14:27:52.160 回答
3

不确定项目要求是什么,但我推荐TPL DataFlow BatchBlock

您将实例化 a BatchBlock<string>,将其绑定到 anActionBlock<string>然后发布到批处理块。

伪代码可能如下所示:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

使用 DataFlow,您甚至可能想用 BufferBlock 替换 BlockingCollection,或者直接发布到缓冲区块而不先添加到阻塞集合,因为批处理块已经是线程安全的。

于 2014-02-11T14:38:59.563 回答