3
4

3 回答 3

4

简短的回答:

在检查结果之前,您没有足够的时间来执行测试的异步部分。

更长的答案:

这个测试执行的动作序列有点像:

  • 设置一个IObservable将异步调用委托的
  • 将其链接到另一个IObservable是其中两个异步调用的合并结果,加在一起
  • Subscribe到结果IObservable,导致方法的两次异步调用
  • 嗬!委托中有a Thread.Sleep,所以异步调用被阻塞!
  • 立即检查结果,这当然是 0 - 两个阻塞的异步调用还没有“完成”

有很多方法可以“解决”这个问题:

  • 去除那个Thread.Sleep
  • 通过更改将调用更改为同步BeginInvoke,尽管这需要对测试进行整体重组
  • 用 aHistoricalScheduler代替Immediate那个

在尝试对 Rx 内容进行单元测试时,强烈建议使用 a HistoricalScheduler- 基本上,它可以让您在虚拟时间中向前和向后跳跃,这是测试 Rx 查询等与时间相关的代码的关键特性:

var theTardis = new HistoricalScheduler();

Func<int, int> inc = (int x) =>
{
    theTardis.Sleep(TimeSpan.FromMilliseconds(1500));
    return x + 1;
};
double result = 0;
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,inc.EndInvoke);

incAsync(1).Merge(incAsync(9)).Sum()
    .SubscribeOn(theTardis)
    .Subscribe(n => result = n);

// To the FUTURE!
theTardis.AdvanceBy(TimeSpan.FromSeconds(5));

Assert.AreEqual(12, result);
于 2013-03-07T21:00:43.450 回答
3

这是同步版本的样子——你所拥有的最直接的版本。Single()将阻塞直到 observable 完成。阻塞通常是您想要避免的事情,但如果您只是在乱搞也没关系。

public void AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };

    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);

    int sum = incAsync(1).Merge(incAsync(9)).Sum().Single();

    Assert.AreEqual(12, sum);
}

和一个异步 TPL 版本,使用await

public async Task AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };

    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);

    int sum = await incAsync(1).Merge(incAsync(9)).Sum();

    Assert.AreEqual(12, sum);
}

最后是一个使用 Rx 的异步程序Do()——如果说这是更大操作的一部分,那就太好了:

public async Task AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };

    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);

    await incAsync(1).Merge(incAsync(9)).Sum().Do(sum =>
    {
        Assert.AreEqual(12, sum);
    });
}
于 2013-03-08T01:12:36.257 回答
2

在我看来,这是源代码中的一个错误,并且 AsynchronousRunInParallel 缺少结果等待,例如在 TheBloodyHardAsyncInvokationPatter 中使用的结果。

使用 FromAsyncPattern 时,只在当前线程中同步执行 BeginInvoke。实际工作和结果处理将安排在 ThreadPool 上。因此,一旦安排了 2 个异步增量,就会开始执行 Assert,而无需等待完成。

我已经添加:

ThreadUtils.WaitUntil(() => result != 0.0);

所以结果看起来像:

        incAsync(1).Merge(incAsync(9)).
            Sum().SubscribeOn(Scheduler.Immediate).
            Subscribe(n => result = n);
        ThreadUtils.WaitUntil(() => result != 0.0);
        Assert.AreEqual(12, result);

或者您可能希望将“订阅”替换为“运行”。它是 Koan 本身的辅助方法,它将使用手动事件等待:

incAsync(1).Merge(incAsync(9)).
             Sum().SubscribeOn(Scheduler.Immediate).
             Run(n => result = n);
Assert.AreEqual(12, result);
于 2014-01-03T01:43:08.460 回答