这对我来说听起来很像 TPL 的问题。您有一组已知的静止数据。您希望将一些繁重的处理划分为并行运行,并且您希望能够批处理负载。
我在您的问题的任何地方都看不到异步源、动态数据源或需要反应的消费者。这是我建议您改用 TPL 的理由。
在单独的说明中,为什么要并行处理 10 的幻数?这是业务需求,还是潜在的优化性能的尝试?通常,最佳实践是允许 TaskPool 根据内核数量和当前负载计算出最适合客户端 CPU 的内容。我想随着设备及其 CPU 结构(单核、多核、多核、低功耗/禁用内核等)的巨大变化,这一点变得越来越重要。
这是您可以在 LinqPad 中执行的一种方法(但请注意缺少 Rx)
void Main()
{
var source = new List<Item>();
for (int i = 0; i < 100; i++){source.Add(new Item(i));}
//Put into batches of ten, but only then pass on the item, not the temporary tuple construct.
var batches = source.Select((item, idx) =>new {item, idx} )
.GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item);
//Process one batch at a time (serially), but process the items of the batch in parallel (concurrently).
foreach (var batch in batches)
{
"Processing batch...".Dump();
var results = batch.AsParallel().Select (item => item.Process());
foreach (var result in results)
{
result.Dump();
}
"Processed batch.".Dump();
}
}
public class Item
{
private static readonly Random _rnd = new Random();
private readonly int _id;
public Item(int id)
{
_id = id;
}
public int Id { get {return _id;} }
public double Process()
{
var threadId = Thread.CurrentThread.ManagedThreadId;
string.Format("Processing on thread:{0}", threadId).Dump(Id);
var loopCount = _rnd.Next(10000,1000000);
Thread.SpinWait(loopCount);
return _rnd.NextDouble();
}
public override string ToString()
{
return string.Format("Item:{0}", _id);
}
}
我很想知道您是否确实存在动态数据问题或反应性消费者问题,但只是“简化”了问题以使其更易于解释。