我有一个运行缓慢的任务要在项目列表上执行。这就是我想要实现的目标:
- 处理应立即开始
- 最多 N 个并发任务
- 每个项目只能运行一次任务
- 所有订阅者都应该看到所有结果(无论他们何时订阅)
这是我的原型 - 它按预期工作(LinqPad)
async Task Main()
{
int throttle = 3;
var result =
Enumerable.Range(1, 20)
.ToObservable()
.Select(n => Observable.FromAsync(() => DoSomething(n)))
.Merge(throttle)
.Replay();
Task.Run(async () => await result);
Util.KeepRunning();
}
public async Task DoSomething(int n)
{
Console.WriteLine($"Starting {n}");
await Task.Delay(100);
Console.WriteLine($"Ending {n}");
}
这是我的实际代码:
sln.Projects = result
.Projects
.ToObservable() //Appears to be unnecessary
.Select(p => Observable.FromAsync(() => projectLoader.Load(p, sln)))
.Merge(_maxConcurrent)
.Replay();
await sln.Projects; //Hangs forever if Replay is used
“结果”是 Microsoft.CodeAnalysisSolution
我尝试了几种变体
- Projects.ToList,带有 ToObservable 和不
带替换项目加载器,带有和不带 _maxConcurrent 的虚拟负载
Ran
这段代码永远挂在等待中,我似乎看不出在功能上与原型有什么不同。
如果我删除重播,代码会按预期运行,但每个订阅者都会受到打击。
有没有人知道这段代码有何不同以及为什么 Replay 方法会影响这样的执行?
我开始怀疑这一定是订阅者的问题——但这不是我现在可以轻松提取来验证的东西。
这是可以工作的原始代码(但不会节流)
sln.Projects = result.Projects
.Select(p => projectLoader.Load(p, sln))
.ToList()
.Select(p => p.ToObservable())
.Merge();