对于发射速度比您的时间间隔更快的源
zip
您的来源具有interval
所需的时间跨度。
zip(source, interval(500)).pipe(
map(([value, _]) => value) // only emit the source value
)
zip
从 的第一个项目发出第source
一个项目interval
,然后从第二个项目发出source
第二个项目interval
,依此类推。如果输出 observable 只应在发出时interval
发出,则第 N 个值 from必须在第 N 个值 from之前source
到达。interval
潜在问题:
如果您的source
发射速度比interval
某个时间点慢(即第 N 个值 from在第 N个值 from之后source
到达),那么将直接发射而不等待下一次发射。interval
zip
interval
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
zip output: -----1-----2-----3-----4--------5----6-----
✓ ✓ ✓ ✓ ⚠️ ⚠️
// emits 5 and 6 don't happen when interval emits
对于以任何速率发射的源
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
let sourceCompleted = false;
const queue = source.pipe(
tap({ complete: () => (sourceCompleted = true) }),
scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
);
return interval(period).pipe(
withLatestFrom(queue), // combine with the latest buffer
takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
);
});
}
source.pipe(
emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
output: -----1-----2-----3-----4-----------5-----6-
✓ ✓ ✓ ✓ ✓ ✓
// all output emits happen when interval emits
https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts