原始问题
我有一个场景,我有多个IObservable
想要组合Merge
然后收听的序列。但是,如果其中一个产生错误,我不希望它使其他流的所有内容崩溃,也不想重新订阅序列(这是一个“永恒的”序列)。
我通过Retry()
在合并之前将 a 附加到流中来做到这一点,即:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
但是,当我想对此进行测试时,问题就出现了。我想测试的是,如果其中一个IObservable
s inobservables
产生一个OnError
,其他的应该仍然能够通过它们发送它们的值并且它们应该得到处理
我以为我只用两个Subject<int>
s 代表两个IObservable
s in observables
; 一个发送 an OnError(new Exception())
,另一个发送OnNext(1)
. 但是,它似乎Subject<int>
会为新订阅重播所有先前的值(实际上Retry()
是),将测试变成无限循环。
我试图通过创建一个手册来解决它,该手册IObservable
在第一次订阅时产生错误,然后是一个空序列,但感觉很hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
我是否以错误的方式使用Subject
或思考?Retry()
对此还有其他想法吗?你将如何解决这种情况?
更新
好的,这是我想要和想到 Retry()
的大理石图。
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
我的问题可能更多是因为我没有一个好的股票类来使用前测试,因为Subject
我想重播我以前的所有错误。
更新 2
这是一个测试用例,它显示了我对Subject
重放其值的意思。如果我说它以冷酷的方式这样做,我是否正确使用了该术语?我知道Subject
这是一种创建热可观察对象的方法,但这种行为对我来说仍然感觉“冷”。
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);