4

在 Rx JS 的表面上,我得到了以下代码片段:

    var observer1 = Rx.Observer.create(
         function (x) {
             console.log('Next: ' + x);
         },
         function (err) {
             console.log('Error: ' + err);
         },
         function () {
             console.log('Completed');
         }
     );  

     var observer2 = Rx.Observer.create(
         function (x) {
             console.log('Next: ' + x); 
         },  
         function (err) {
             console.log('Error: ' + err);   
         },  
         function () {
             console.log('Completed');   
         }   
     );  


     var source1 = Rx.Observable.return(1);
     var source2 = Rx.Observable.return(2);

     var subscription1 = source1.subscribe(observer1);
     var subscription2 = source2.subscribe(observer1);

输出:下一个:1 已完成

JS BIN 代码参考:http: //goo.gl/DiHdWu

为两个流订阅同一个观察者只会从第一个流中产生数据。但是,当订阅其他观察者时,事情会按预期进行。有人可以解释发生了什么吗?

     var subscription1 = source1.subscribe(observer1);
     var subscription2 = source2.subscribe(observer2);

输出:下一个:1 已完成 下一个:2 已完成

4

1 回答 1

4

是的,观察者可以收听多个 Observable,但不能以您尝试使用的方式收听。这可以通过使用Merge,concat运算符来实现。代码参考jsbin

为什么你的代码不起作用?

我们IObserver每次调用Observer.create. 一旦 OnError 或 OnComplete 被调用,它将忽略任何未来的 OnNext 调用。

当我们使用一个观察者订阅多个 Observable 时,在第一个 observable 终止/完成后(即当它触发 OnError / OnCompleted 时),观察者将不再为任何订阅的 observable 工作。因为来自第一个 observable 的终止消息将导致观察者忽略来自任何进一步订阅的 observable 的消息。

为了使您的问题正常工作,您需要使用类似 的运算符mergeconcat它将在内部使用多个观察者,并且不会将来自除最后一个 Observable 之外的任何可观察对象的完成消息(OnError/OnCompleted)传递给外部观察者。

//Triggers observer1 for both observables(source1 & source2)
var subscription = source1.concat(source2).subscribe(observer1);

//Triggers observer2 for both observables(source1 & source2)
var subscription = source1.merge(source2).subscribe(observer2);
于 2015-05-12T12:58:34.667 回答