5

原始问题

我有一个场景,我有多个IObservable想要组合Merge然后收听的序列。但是,如果其中一个产生错误,我不希望它使其他流的所有内容崩溃,也不想重新订阅序列(这是一个“永恒的”序列)。

我通过Retry()在合并之前将 a 附加到流中来做到这一点,即:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

但是,当我想对此进行测试时,问题就出现了。我想测试的是,如果其中一个IObservables inobservables产生一个OnError,其他的应该仍然能够通过它们发送它们的值并且它们应该得到处理

我以为我只用两个Subject<int>s 代表两个IObservables in observables; 一个发送 an OnError(new Exception()),另一个发送OnNext(1). 但是,它似乎Subject<int>会为新订阅重播所有先前的值(实际上Retry()是),将测试变成无限循环。

我试图通过创建一个手册来解决它,该手册IObservable在第一次订阅时产生错误,然后是一个空序列,但感觉很hacky:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

我是否以错误的方式使用Subject或思考?Retry()对此还有其他想法吗?你将如何解决这种情况?

更新

好的,这是我想要和想到 Retry()的大理石图。

o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

我的问题可能更多是因为我没有一个好的股票类来使用前测试,因为Subject我想重播我以前的所有错误。

更新 2

这是一个测试用例,它显示了我对Subject重放其值的意思。如果我说它以冷酷的方式这样做,我是否正确使用了该术语?我知道Subject这是一种创建热可观察对象的方法,但这种行为对我来说仍然感觉“冷”。

var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);
4

1 回答 1

4

根据您更新的要求(您想重试失败的可观察对象,而不是只想忽略它们),我们可以提出一个可行的解决方案。

首先,重要的是要了解冷可观察对象(在每个订阅上重新创建)和热可观察对象(无论订阅如何都存在)之间的区别。您不能Retry()使用热可观察对象,因为它不知道如何重新创建基础事件。也就是说,如果一个 hot observable 错误,它就永远消失了。

Subject创建一个 hot observable,从某种意义上说,您可以在OnNext没有订阅者的情况下进行调用,并且它将按预期运行。要将热 observable 转换为冷 observable,您可以使用Observable.Defer,它将包含该 observable 的“订阅创建”逻辑。

综上所述,这是修改后的原始代码:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), 
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };                                            

observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

和测试(与之前类似):

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

和预期的输出:

1
2
-1
done

当然,您需要根据您所观察到的底层内容来显着修改此概念。使用主题进行测试与实际使用它们不同。

我还想指出这条评论:

但是,似乎 Subject 会为新订阅重放所有以前的值(实际上是 Retry() ),从而将测试变成无限循环。

不是真的 -Subject不这样做。基于Retry重新创建订阅的事实,您的代码的其他一些方面会导致无限循环,并且订阅会在某些时候产生错误。


原始答案(完成)

问题是它Retry()没有做你想做的事。从这里:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

重复源 observable 序列 retryCount 次或直到它成功终止。

这意味着Retry它将不断尝试并重新连接到底层的 observable,直到它成功并且不抛出错误。

我的理解是,您实际上希望 observable 中的异常被忽略,而不是重试。这将做你想做的事:

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

这用于Catch捕获异常的可观察对象,并在此时将其替换为空的可观察对象。

这是使用主题的完整测试:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

正如预期的那样,这会产生:

1
2
done
于 2012-06-11T11:10:15.457 回答