假设我有两个返回整数 1 到 5 的序列。
第一个返回 1、2 和 3 非常快,但 4 和 5 每个需要 200 毫秒。
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
第二个以 200 毫秒的延迟返回 1、2 和 3,但快速返回 4 和 5。
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
联合这两个序列给我的只是数字 1 到 5。
FastFirst().Union(SlowFirst());
我不能保证这两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证我的解决方案。因此,我想并行化联合,以最大限度地减少我的示例中的(人为)延迟。
一个真实的场景:我有一个返回一些实体的缓存和一个返回所有实体的数据源。我希望能够从内部并行化对缓存和数据源的请求的方法返回一个迭代器,以便缓存结果尽可能快地产生。
注 1:我意识到这仍然在浪费 CPU 周期;我不是在问如何防止序列迭代它们的慢元素,而是问我如何尽可能快地合并它们。
更新 1:我已经定制了 achitaka-san 的出色响应以接受多个生产者,并使用 ContinueWhenAll 设置 BlockingCollection 的 CompleteAdding 一次。我只是把它放在这里,因为它会因为缺少评论格式而丢失。任何进一步的反馈都会很棒!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}