所以我得到了一些我认为可以满足您大部分需求的代码。基本上,我创建了一个函数zipAndContinue
,它将像 一样运行zip
,但只要某些底层流仍有数据要发出,它就会继续发出项目。此功能仅 [简要] 使用冷可观察对象进行了测试。
此外,欢迎更正/增强/编辑。
function zipAndContinue() {
// Augment each observable so it ends with null
const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
const combined$ = Rx.Observable.combineLatest(observables);
// The first item from the combined stream is our first 'zipped' item
const first$ = combined$.first();
// We calculate subsequent 'zipped' item by only grabbing
// the items from the buffer that have all of the required updated
// items (remember, combineLatest emits each time any of the streams
// updates).
const subsequent$ = combined$
.skip(1)
.bufferWithCount(arguments.length)
.flatMap(zipped)
.filter(xs => !xs.every(x => x === null));
// We return the concatenation of these two streams
return first$.concat(subsequent$)
}
以下是使用的实用函数:
function endWithNull(observable) {
return Rx.Observable.create(observer => {
return observable.subscribe({
onNext: x => observer.onNext(x),
onError: x => observer.onError(x),
onCompleted: () => {
observer.onNext(null);
observer.onCompleted();
}
})
})
}
function zipped(xs) {
const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
// The number of streams that are still emitting
const stillEmitting = Math.max.apply(null, nonNullCounts);
if (stillEmitting === 0) {
return Rx.Observable.empty();
}
// Skip any intermittent results
return Rx.Observable.from(xs).skip(stillEmitting - 1);
}
这是示例用法:
const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
.subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]
这是一个 js-fiddle(您可以单击运行,然后打开控制台):https ://jsfiddle.net/ptx4g6wd/