3

我有一些代码可以轮询直到任务完成

见下文

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

我曾尝试使用 takeUntil 和 takeWhile,但问题是一旦任务完成,最后一个值就永远不会发布。

为了解决这个问题,我必须在 stopPoll 主题中包含 tap 方法,并增加 stopCount 以获取最后一个值。

所以上面的工作,但感觉有点乱,我敢肯定一定有更好的方法来实现这一点?

我本来希望 takeUntil 发布最后一个值或有一个覆盖告诉它,例如 takeUntil(observable, {publishLast: true})

顺便说一句更新,可观察到的订阅由 Angular 6 模板提前谢谢

4

3 回答 3

2

您可以做的一件事是使用自定义的 takeWhile-like 运算符,如下所示:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

将其视为 takeWhite 的变体似乎不是一个好主意,因为它不仅在条件成立时获取值,而且还会发出额外的值。

一个更优雅的解决方案可能是让模拟状态 observable 发出两种值:下一个通知和完成通知,类似于物化/非物化操作符的工作方式。

于 2018-08-10T17:30:09.877 回答
1

如果你想完成 observable,你也可以使用 next() 创建主题和发射。

this.stopPoll: Subject<any> = new Subject<any>();

如果您想完成订阅。你可以调用 this.stopPoll.next(true);

您可以访问 subscribe() 中的数据

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
于 2018-08-09T12:42:24.277 回答
1

与此同时,这已在 rxjs 中实现为takeWhile(condition, ?inclusive)

timer(0, 10).pipe(
    takeWhile((x) => x < 3, true)
)

发出 0, 1, 2, 3

于 2021-06-23T14:17:33.720 回答