0

I am trying to control the inflow for a slow subscriber. Tried the below in NodeJS

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var commJson = xmlNodeStream.bufferWithCount(2).publish();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});

commJson.connect();

When I run the above code, I would expect the slow subscriber to pause for 5 seconds everytime before next data-batch is received.

But that is not happening. After an initial 5 second delay, all data is flooded to slowSubscriber in batches of 2.

What is the right way to control the inflow so that slow subscibers can take their time (and preferably fast ones can wait for the slow ones to complete) ?

4

1 回答 1

3

它不会暂停,因为它setTimeout不会阻止执行,它只是将工作安排在稍后完成,即 2 秒后,然后有更多数据进入,它被安排为 2 秒 + 从现在开始的一些微小增量。结果是快速订阅者和慢速订阅者将同时完成,但慢速订阅者的结果要在 2 秒后才能可视化。

如果您的实际用例中的慢速订阅者确实是非阻塞的,那么您有两种选择来控制事件流,或者您需要控制来自消息源的流,无论可能在哪里。或者您需要使用其中一种背压运算符,例如controlled()

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { 
                console.log("============\nSlowsub called: ", x); 
                controller.request(1);
               }, 5000);
});

commJson.request(1);
于 2015-08-19T15:22:48.160 回答