I am trying to control the inflow for a slow subscriber. Tried the below in NodeJS
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var commJson = xmlNodeStream.bufferWithCount(2).publish();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});
commJson.connect();
When I run the above code, I would expect the slow subscriber to pause for 5 seconds everytime before next data-batch is received.
But that is not happening. After an initial 5 second delay, all data is flooded to slowSubscriber
in batches of 2.
What is the right way to control the inflow so that slow subscibers can take their time (and preferably fast ones can wait for the slow ones to complete) ?