1

我有一个用户编程场景,用户最终可以创建两个相互依赖的 observables。RxJS 不允许循环依赖,据我所知,内存或堆栈达到了它的限制,并且onError使用 value 触发回调true

如何显式检测循环依赖并抛出更具描述性的错误消息?

这段代码说明了如何在 RxJS 中创建循环依赖:

var obsA,
    obsB;

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(obsB, function (a, b) {
        return a + b;
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(obsA, function (b, a) {
        return b + a;
    });


obsA
    .subscribe(function (val) {
        console.log('onNext:' + val);
    },
    function (err) {
        console.error('onError: ' + err);
    },
    function () {
        console.log('onCompleted');
    });

错误信息很简单true

4

1 回答 1

2

原始问题中的代码不会创建循环依赖。在你定义的时候ObsAObsBundefined,所以你真正做的是调用combineLatest(undefined, function ...)。所以你看到的错误是因为你传递undefinedcombinedLatest().

创建一个真正的循环依赖实际上需要一些努力。如果你使用defer,那么你将有一个真正的循环依赖:

var obsA,
    obsB,
    aRef,
    bRef;

aRef = Rx.Observable.defer(function () {
    return obsA;
});

bRef = Rx.Observable.defer(function () {
    return obsB;
});

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(bRef, function (a, b) {
            return a + b;
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(aRef, function (b, a) {
        return b + a;
    });

obsA.subscribe();
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

现在这是一个真正的循环依赖。不幸的是,您仍然会遇到相同的错误,尽管堆栈跟踪要深得多:

RangeError: Maximum call stack size exceeded.
/* ... stack ... */

没有万无一失的方法来检测周期。您可以将 observables 包装在一个新的 observable 中并检测对您的 subscribe 方法的递归调用。subscribeOn但是,如果底层的 observables 正在使用或publish其他concat任何延迟实际循环订阅的东西,那么这样的算法就会失败。

我最好的建议是附加一个catch检查范围错误并用更好的错误替换它的子句:

var obsA,
    obsB,
    aRef,
    bRef;

aRef = Rx.Observable.defer(function () {
    return obsA;
});

bRef = Rx.Observable.defer(function () {
    return obsB;
});

obsA = Rx.Observable
    .returnValue(42)
    .combineLatest(bRef, function (a, b) {
            return a + b;
    })
    .catch(function (e) {
        var isStackError = e instanceof RangeError && e.message === 'Maximum call stack size exceeded';
    
        return Rx.Observable.throw(isStackError ? new Error('Invalid, possibly circular observables.') : e);
    });

obsB = Rx.Observable
    .returnValue(42)
    .combineLatest(aRef, function (b, a) {
        return b + a;
    })
    .catch(function (e) {
        var isStackError = e instanceof RangeError && e.message === 'Maximum call stack size exceeded';
    
        return Rx.Observable.throw(isStackError ? new Error('Invalid, possibly circular observables.') : e);
    });

obsA.subscribe();
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

于 2013-07-01T02:32:33.177 回答