8

我正在尝试使用 RxJS 进行简单的简短投票。delay它需要每秒向服务器上的位置发出一次请求path,一旦达到以下两个条件之一就结束:回调isComplete(data)返回 true 或者它已经尝试了服务器超过maxTries. 这是基本代码:

newShortPoll(path, maxTries, delay, isComplete) {
    return Observable.interval(delay)
    .take(maxTries)
    .flatMap((tryNumber) => http.get(path))
    .doWhile((data) => !isComplete(data));
  }

但是,在 RxJS 5.0 中不存在 doWhile,因此只能尝试服务器的条件maxTries有效,这要归功于 take() 调用,但isComplete条件不适用。我怎样才能做到这一点,所以 observable 将 next() 值,直到 isComplete 返回 true,此时它将 next() 该值和 complete()。

我应该注意到这takeWhile()对我不起作用。它不返回最后一个值,这实际上是最重要的,因为那是我们知道它完成的时候。

谢谢!

4

3 回答 3

3

我们可以创建一个实用函数来创建第二个 Observable,它会发出内部 Observable 发出的每个项目;但是,一旦满足我们的条件,我们将调用 onCompleted 函数:

function takeUntilInclusive(inner$, predicate) {
    return Rx.Observable.create(observer => {
        var subscription = inner$.subscribe(item => {
            observer.onNext(item);

            if (predicate(item)) {
                observer.onCompleted();
            }
        }, observer.onError, observer.onCompleted);


        return () => {
            subscription.dispose();
        }
    });
}

这是使用我们新的实用程序方法的快速片段:

const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));

// >> 0
// >> 1
// >> 2
// >> 3

这个答案基于:RX Observable.TakeWhile 在每个元素之前检查条件,但我需要在之后执行检查

于 2016-03-04T15:46:37.107 回答
1

您可以通过使用retryfirst运算符来实现此目的。

// helper observable that can return incomplete/complete data or fail.
var server = Rx.Observable.create(function (observer) {
  var x = Math.random();

  if(x < 0.1) {
    observer.next(true);
  } else if (x < 0.5) {
    observer.error("error");
  } else {
    observer.next(false);
  }
  observer.complete();

  return function () {
  };
});
   
function isComplete(data) {
  return data;
}
  
var delay = 1000;
Rx.Observable.interval(delay)
  .switchMap(() => {
    return server
      .do((data) => {
        console.log('Server returned ' + data);
      }, () => {
        console.log('Server threw');
      })
      .retry(3);
  })
  .first((data) => isComplete(data))
  .subscribe(() => {
    console.log('Got completed value');
  }, () => {
    console.log('Got error');
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

于 2017-01-24T15:42:44.333 回答
0

这是一个老问题,但我还必须轮询一个端点并得出这个问题。doWhile这是我最终创建的我自己的运算符:

import { pipe, from } from 'rxjs';
import { switchMap, takeWhile, filter, map } from 'rxjs/operators';

export function doWhile<T>(shouldContinue: (a: T) => boolean) {
  return pipe(
    switchMap((data: T) => from([
      { data, continue: true },
      { data, continue: shouldContinue(data), exclude: true }
    ])),
    takeWhile(message => message.continue),
    filter(message => !message.exclude),
    map(message => message.data)
  );
}

这有点奇怪,但到目前为止它对我有用。您可以将它与take您尝试的方式一起使用。

于 2018-12-20T21:15:09.710 回答