1

我正在尝试轮询 REST API 以更新使用以下代码可以正常工作的数据表:

pollData(url, interval) {
  return Rx.Observable.interval(interval)
    .mergeMap(() => this.http.get(url));
}

// get data
this.dataService.pollData(this.url, this.updateInterval)
            .subscribe(
                data => console.log(data),
                err => console.log(err),
                () => console.log('done'));

问题是错误和完成永远不会被调用。任何关于使用 onError 和 onCompete 的建议将不胜感激。谢谢!

4

2 回答 2

3

关于onComplete对观察者的调用,只有在源可观察对象完成时才会生效。这意味着当pollData完成返回的可观察对象时。由于您当前正在轮询没有退出条件,那么您的 observable 自然永远不会完成。

要完成这个 observable,你需要提出一个退出条件:

  • 超时(例如,轮询 X 秒,然后停止轮询)
  • 投票次数
  • 基于 pollData 的条件(例如,如果在 X 次连续轮询后未检测到更改)
  • 外部完成信号
  • 对您的用例有意义的任何其他条件

所有这些条件都很容易用 RxJS 实现,因为它们需要你更新pollData函数的代码。

例如对于外部完成信号,您可以编写:

// defining somewhere the subject for signalling end of polling
stopPollingS = new Rx.Subject();

// somehow pass this subject as a parameter of the polling function
pollData(url, interval, stopPollingS) {
return Rx.Observable
         .interval(interval)
         .mergeMap(() => this.http.get(url))
         .takeUntil(stopPollingS);
}

// somewhere in your code when you want to stop polling
stopPollingS.onNext(true);

关于onError观察员的电话,我不确定我是否了解正在发生的事情。您是否尝试过引发错误并检查onError您的观察者的处理程序是否确实被调用?如果没有错误,很明显onError不会调用 。

于 2016-02-01T05:14:31.280 回答
1

以防万一有人想知道我是如何解决这个问题并实现所需的功能的。基本上我只需要将 observable 包装在另一个中并将错误作为数据返回。

initiatePolling(url, interval) {

    var http = this.http;

    return Rx.Observable.create(function (observer) {

        // initial request (no delay)
        requestData();
        var timerId = setInterval(requestData, interval);

        function requestData() {
            var subscription = http.get(url).timeout(20000)
                .subscribe(
                    result => {
                        observer.next(result);
                        subscription.unsubscribe();
                    },
                    err => {
                        observer.next(err);
                        subscription.unsubscribe();
                    },
                    () => {
                        subscription.unsubscribe();
                    });
        }

        return function () {
            observer.complete();
            window.clearInterval(timerId);
        }
    });
}
于 2016-02-12T01:01:39.503 回答