3

我有一个 Observable ,source它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个每 500 毫秒可靠地发出其值的 Observable。

假设source在这些时间发出值:

  • 100 毫秒 - 第一项
  • 980ms - 第二项
  • 1020ms - 第三项
  • 1300ms - 第四个项目,等等。

我想“平滑”这个流,以便得到如下输出:

  • 500 毫秒 - 第一项
  • 1000 毫秒 - 第二项
  • 1500ms - 第三项
  • 2000ms - 第四项

一种天真的方法可能是在源项目的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。

我尝试了, 和的各种组合.timer(),但没有任何希望。.interval().flatMap()

4

3 回答 3

2

我想你可以试试这个:

const src$ = merge(
  timer(100).pipe(mapTo(1)),
  timer(980).pipe(mapTo(2)),
  timer(1020).pipe(mapTo(3)),
  timer(1300).pipe(mapTo(4))
);

src$
  .pipe(
    bufferTime(500),
    mergeAll()
  )
  .subscribe(console.log);

bufferTime用于创建一个计时器,该计时器将以恒定的时间间隔发射,而与发射的值无关。然后mergeAll用于爆炸产生的数组bufferTime

StackBlitz 演示

于 2021-01-23T09:06:37.963 回答
1

您可以使用combineLatest,intervalthrottle- 您添加第二个 observable的组合,以及interval您想要的调用之间的时间(例如 500 毫秒),因此您的 observable 将每 500 毫秒发出一次(当与 一起使用时combineLatest),现在它将每 500 毫秒每个原始source发射的时间,因此您可以添加throttle管道,这将导致间隔节流:

combineLatest([source, timer(5000)])
  .pipe(
    throttle(() => interval(5000)),
    tap(([value]) => {
      console.log("emitted", value, new Date().getSeconds());
    })
  )
  .subscribe();

tap这里不需要,只是为了演示而添加)

于 2021-01-23T07:21:54.530 回答
1

对于发射速度比您的时间间隔更快的源

zip您的来源具有interval所需的时间跨度。

zip(source, interval(500)).pipe(
  map(([value, _]) => value)  // only emit the source value
)

在此处输入图像描述

zip从 的第一个项目发出第source一个项目interval,然后从第二个项目发出source第二个项目interval,依此类推。如果输出 observable 只应在发出时interval发出,则第 N 个值 from必须在第 N 个值 from之前source到达。interval

潜在问题: 如果您的source发射速度比interval某个时间点慢(即第 N 个值 from在第 N个值 from之后source到达),那么将直接发射而不等待下一次发射。intervalzipinterval

// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
zip output:   -----1-----2-----3-----4--------5----6-----
                   ✓     ✓     ✓     ✓        ⚠️    ⚠️
// emits 5 and 6 don't happen when interval emits

对于以任何速率发射的源

function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      let sourceCompleted = false;
      const queue = source.pipe(
        tap({ complete: () => (sourceCompleted = true) }),
        scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
      );
      return interval(period).pipe(
        withLatestFrom(queue), // combine with the latest buffer
        takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
        filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
        map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
      );
    });
}

source.pipe(
  emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
output:       -----1-----2-----3-----4-----------5-----6-
                   ✓     ✓     ✓     ✓           ✓     ✓   
// all output emits happen when interval emits

https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts

于 2021-01-23T16:00:52.400 回答