3 回答
简短的回答:
在检查结果之前,您没有足够的时间来执行测试的异步部分。
更长的答案:
这个测试执行的动作序列有点像:
- 设置一个
IObservable
将异步调用委托的 - 将其链接到另一个
IObservable
是其中两个异步调用的合并结果,加在一起 Subscribe
到结果IObservable
,导致方法的两次异步调用- 嗬!委托中有a
Thread.Sleep
,所以异步调用被阻塞! - 立即检查结果,这当然是 0 - 两个阻塞的异步调用还没有“完成”
有很多方法可以“解决”这个问题:
- 去除那个
Thread.Sleep
- 通过更改将调用更改为同步
BeginInvoke
,尽管这需要对测试进行整体重组 - 用 a
HistoricalScheduler
代替Immediate
那个
在尝试对 Rx 内容进行单元测试时,强烈建议使用 a HistoricalScheduler
- 基本上,它可以让您在虚拟时间中向前和向后跳跃,这是测试 Rx 查询等与时间相关的代码的关键特性:
var theTardis = new HistoricalScheduler();
Func<int, int> inc = (int x) =>
{
theTardis.Sleep(TimeSpan.FromMilliseconds(1500));
return x + 1;
};
double result = 0;
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,inc.EndInvoke);
incAsync(1).Merge(incAsync(9)).Sum()
.SubscribeOn(theTardis)
.Subscribe(n => result = n);
// To the FUTURE!
theTardis.AdvanceBy(TimeSpan.FromSeconds(5));
Assert.AreEqual(12, result);
这是同步版本的样子——你所拥有的最直接的版本。Single()
将阻塞直到 observable 完成。阻塞通常是您想要避免的事情,但如果您只是在乱搞也没关系。
public void AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
int sum = incAsync(1).Merge(incAsync(9)).Sum().Single();
Assert.AreEqual(12, sum);
}
和一个异步 TPL 版本,使用await
:
public async Task AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
int sum = await incAsync(1).Merge(incAsync(9)).Sum();
Assert.AreEqual(12, sum);
}
最后是一个使用 Rx 的异步程序Do()
——如果说这是更大操作的一部分,那就太好了:
public async Task AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
await incAsync(1).Merge(incAsync(9)).Sum().Do(sum =>
{
Assert.AreEqual(12, sum);
});
}
在我看来,这是源代码中的一个错误,并且 AsynchronousRunInParallel 缺少结果等待,例如在 TheBloodyHardAsyncInvokationPatter 中使用的结果。
使用 FromAsyncPattern 时,只在当前线程中同步执行 BeginInvoke。实际工作和结果处理将安排在 ThreadPool 上。因此,一旦安排了 2 个异步增量,就会开始执行 Assert,而无需等待完成。
我已经添加:
ThreadUtils.WaitUntil(() => result != 0.0);
所以结果看起来像:
incAsync(1).Merge(incAsync(9)).
Sum().SubscribeOn(Scheduler.Immediate).
Subscribe(n => result = n);
ThreadUtils.WaitUntil(() => result != 0.0);
Assert.AreEqual(12, result);
或者您可能希望将“订阅”替换为“运行”。它是 Koan 本身的辅助方法,它将使用手动事件等待:
incAsync(1).Merge(incAsync(9)).
Sum().SubscribeOn(Scheduler.Immediate).
Run(n => result = n);
Assert.AreEqual(12, result);