1

我在使 RxJS5 可观察流以我希望的方式运行时遇到问题。

流应该使用 axios 向网站发送 HTTP 请求,如果响应是 HTTP 错误(axios 强制为 JavaScript 错误),则可观察序列应等待 10 毫秒,然后尝试重新发送请求(出于某种原因当您立即重试发送请求并不断抛出错误时,我向其发送请求的网站不喜欢它,但大多数情况下表现良好,延迟 10 毫秒)。

Rx.Observable
  .fromPromise(axios('http://example.com/12345'))
  .map(x => new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(x)
    }, 2000)
  }))
  .debounceTime(2000)
  .do(console.log)
  .retry(10)
  .subscribe(console.log, console.error)

我有一个关于 Codepen 的示例,其中进行了一些更改,以便更清楚地了解流的工作原理:http ://codepen.io/leakyabstractions/pen/pNmvyZ?editors=0010

我尝试使用.delay(), .debounceTime(), .timer(), .timeInterval()and.timeout()代替.map()运算符,但没有任何.map()效果(包括 )。我究竟做错了什么?

4

2 回答 2

1

所以基本上你正在寻找的是“10ms后重试,但只有10次”?(这是你的retry(10)建议。我认为一个复杂的解决方案将包括retryWhen在这里:

const mockedRestError$ = Rx.Observable.throw("http://example.com/12345");

// we'll start with an empty string, because otherwhise
// we could not log the "start..."
Rx.Observable.of("")
  .do(() => console.log("start..."))
  .switchMapTo(mockedRestError$)
  .retryWhen(tenTimesWithDelay)
  .subscribe(console.log, console.error, console.info); // is never called, because 


function tenTimesWithDelay(errors) {
  return errors
    .scan((count, err) => {
      ++count;
      // optionally to throw the error all the way down to the subscription
      // comment the throw out to have the stream simply complete
      if (count >= 10) {
        throw err;
      }
      return count;
    }, 0)
    .takeWhile(count => count < 10)
    .do(count => console.log(`Retry #${count} in 100ms...`))
    .delay(100);
}

这是代码笔:http ://codepen.io/anon/pen/bBydwZ?editors=0010

另请注意,我将延迟设置为 100 毫秒而不是 10 毫秒,以便它在控制台中显示更清晰。

于 2016-12-23T16:15:10.147 回答
-1

olsn 的回答有效,但是我想分享另一个我不小心想到的解决方案,在我看来这更直接一些:

console.log('start')
Rx.Observable
// emit observable every 10 ms, change to a bigger number to observe results
.interval(10)
// start immediately
.startWith(0)
// do it 10 times
.take(10)
.do(x => console.log('emitting', x))
// for every observable emitted, do an HTTP request
.flatMap(() => new Promise((resolve, reject) => resolve('resolved promise')))
.first(x => !(x instanceof Error))
.subscribe(console.log, console.warn, () => console.info('done'))
于 2016-12-27T21:40:36.083 回答