我在 Angular 2 项目中使用RxJs 版本 5 。我想创建一些可观察对象,但我不希望立即调用可观察对象。
在版本 4中,您可以使用(例如)受控命令或Pausable Buffers来控制调用。但该功能在版本 5 中(尚不)可用。
如何在 RxJs 5 中获得这种功能?
我的最终目标是将创建的可观察对象排队并一一调用。只有在前一个处理成功时才会调用下一个。当一个失败时,队列被清空。
编辑
借助@Niklas Fasching 的评论,我可以使用发布操作创建一个可行的解决方案。
// Queue to queue operations
const queue = [];
// Just a function to create Observers
function createObserver(id): Observer {
return {
next: function (x) {
console.log('Next: ' + id + x);
},
error: function (err) {
console.log('Error: ' + err);
},
complete: function () {
console.log('Completed');
}
};
};
// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {
console.log('add ' + name);
// Create an async operation
var observable = Rx.Observable.create(observer => {
// Some async operation
setTimeout(() =>
observer.next(' Done'),
500);
});
// Hold the operation
var published = observable.publish();
// Add Global subscribe
published.subscribe(createObserver('Global'));
// Add it to the queue
queue.push(published);
// Return the published so the caller could add a subscribe
return published;
};
// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));
// Dequeue and run the first
queue.shift().connect();