0

我有一个运行缓慢的任务要在项目列表上执行。这就是我想要实现的目标:

  1. 处理应立即开始
  2. 最多 N 个并发任务
  3. 每个项目只能运行一次任务
  4. 所有订阅者都应该看到所有结果(无论他们何时订阅)

这是我的原型 - 它按预期工作(LinqPad)

async Task Main()
{
     int throttle = 3;

    var result =
        Enumerable.Range(1, 20)
        .ToObservable()
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Replay();

    Task.Run(async () => await result);
    Util.KeepRunning();
}

public async Task DoSomething(int n)
{
    Console.WriteLine($"Starting {n}");
    await Task.Delay(100);
    Console.WriteLine($"Ending {n}");
}

这是我的实际代码:

    sln.Projects = result
        .Projects
        .ToObservable() //Appears to be unnecessary
        .Select(p => Observable.FromAsync(() => projectLoader.Load(p, sln)))
        .Merge(_maxConcurrent)
        .Replay(); 

     await sln.Projects; //Hangs forever if Replay is used

“结果”是 Microsoft.CodeAnalysisSolution

我尝试了几种变体
- Projects.ToList,带有 ToObservable 和不
带替换项目加载器,带有和不带 _maxConcurrent 的虚拟负载
Ran

这段代码永远挂在等待中,我似乎看不出在功能上与原型有什么不同。

如果我删除重播,代码会按预期运行,但每个订阅者都会受到打击。

有没有人知道这段代码有何不同以及为什么 Replay 方法会影响这样的执行?

我开始怀疑这一定是订阅者的问题——但这不是我现在可以轻松提取来验证的东西。

这是可以工作的原始代码(但不会节流)

    sln.Projects = result.Projects
        .Select(p => projectLoader.Load(p, sln))
        .ToList()
        .Select(p => p.ToObservable())
        .Merge();
4

1 回答 1

0

原型和你实际使用代码的区别在于你存储 observable 的方式。在您的原型中,它被存储为一个局部变量,但在您的生产代码中,您将它保存在sln对象中。

为了能够为未来的订阅者重放 observable 中的所有信号,它无法完成。我认为它与局部变量一起使用的原因与局部范围有关,但我没有深入研究。

如果您在 Linqpad 中运行以下命令,您将看到将 observable 存储在对象中会改变行为。

async Task Main()
{
     int throttle = 3;
    var container = new Container();

    container.Results =
        Enumerable.Range(1, 20)
            .ToObservable()
            .Select(n => Observable.FromAsync(() => DoSomething(n)))
            .Merge(throttle)
            .Replay();

    var res = await container.Results;

    res.Dump("Res");
}

public async Task DoSomething(int n)
{
    Console.WriteLine($"Starting {n}");
    await Task.Delay(100);
    Console.WriteLine($"Ending {n}");
}

public class Container
{
    public IObservable<Unit> Results { get; set; }
}

Replay()返回一个IConnectableObservable,要使这个可观察的返回值,你必须调用Connect()它。这样做使上面的代码对我有用:

async Task Main()
{
     int throttle = 3;
    var container = new Container();

    container.Results =
        Enumerable.Range(1, 20)
        .ToObservable()
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Replay();

    container.Results.Connect();

    (await container.Results).Dump("1");
    (await container.Results).Dump("2");
}

public async Task DoSomething(int n)
{
    Console.WriteLine($"Starting {n}");
    await Task.Delay(100);
    Console.WriteLine($"Ending {n}");
}

public class Container
{
    public IConnectableObservable<Unit> Results { get; set; }
}
于 2017-07-19T05:35:00.487 回答