例如,如果您有这样的源 Observable:
const Rx = require('rxjs/Rx');
const Observable = Rx.Observable;
const TIMEOUT = 1000;
const source = Observable.range(1, 20)
.concatMap(v => Observable.of(v).delay(Math.random() * 2000));
然后你可以使用缓冲它的值scan
。要重置我正在使用的缓冲区.merge(bufferNotifier.mapTo(null))
。然后switchMap()
我总是等待 1000 毫秒forkJoin()
来发射。如果不是,它会被另一个 Observable “覆盖”,因为新缓冲区到达:
const bufferNotifier = new Subject();
const chain = source
.do(undefined, undefined, () => bufferNotifier.complete()) // properly complete the chain
.merge(bufferNotifier.mapTo(null)) // reset buffer Subject
.scan((acc, val) => {
if (val === null) {
return [];
}
acc.push(val);
return acc;
}, [])
.filter(arr => arr.length > 0)
.switchMap(buffer => { // wait 1s until emitting the buffer further
return Observable.forkJoin(
Observable.of(buffer),
Observable.timer(1000).take(1),
arr => arr
);
})
.do(() => bufferNotifier.next()) // trigger reset the buffer
.subscribe(console.log);
这输出例如:
[ 1 ]
[ 2 ]
[ 3, 4 ]
[ 5 ]
[ 6, 7 ]
[ 8, 9, 10, 11, 12 ]
[ 13 ]
[ 14, 15 ]
[ 16 ]
[ 17 ]
[ 18 ]
[ 19, 20 ]