3

在 RxJs 中有一个称为 Observable.expand 的方法,它可以使用转换函数递归地扩展序列。

例如,

Rx.Observable.return(0).expand(function (x) { return Rx.Observable.return(x+1); }) 

将发出所有整数

但是,我在 RxJava 中找不到这种方法。RxJava 中是否还有其他方法可以实现类似的目标?

有关 exapand() 的更详细规范,请参阅https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/expand.md

4

2 回答 2

4

好吧,您给出的示例可以简化为:

Observable.range(0, Integer.MAX_VALUE)

但我假设你实际上想做一些更复杂的事情。scan与您正在寻找的内容不同,但它可以做类似的事情,我们可以使用它来制作一个Transformer您可以重复使用的类似来扩展。

的主要区别scan在于它在每一步都需要一个新的输入值,但是,就像展开一样,它保留了以前的值。我们可以通过忽略新的输入值来解决这个问题。因为扫描类似于扩展,所以我将从一个scan有一些相当大缺陷的示例开始,然后探索一个更好的选择。

public class Expand<T, T> implements Transformer<T, T> {
  private final Func1<T, T> expandFunc;

  public Expand(final Func1<T, T> expandFunc) {
    this.initialValue = initialValue;
    this.expandFunc = expandFunc;
  }

  @Override
  public Observable<T> call(Observable<T> source) {
    // Here we treat emissions from the source as a new 'initial value'.

    // NOTE: This will effectively only allow one input from the source, since the
    // output Observable expands infinitely. If you want it to switch to a new expanded
    // observable each time the source emits, use switchMap instead of concatMap.
    return source.concatMap(new Func1<T, Observable<T>>() {

      @Override
      public Observable<T> call(T initialValue) {
        // Make an infinite null Observable, just for our next-step signal.
        return Observable.<Void>just(null).repeat()
            .scan(initialValue, new Func2<T, Void, T>() {

              @Override
              public T call(final T currentValue, final Void unusedSignal) {
                return expandFunc.call(currentValue);
              }

            });
      }

    });
  }
}

要使用该转换器,让我们创建一个获取当前数字、加 1 并对其平方的方法。

Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

});

无论如何,您可能已经注意到这种方法的一些主要尴尬点。首先,有 switch 与 concatMap 的事情,以及这本质上如何将一个项目从 Observable 输出变为无限链。其次,整个 'Void' 信号 Observable 不应该是必要的。当然,我们可以使用rangeorjust(1).repeat()或许多其他东西,但它们最终还是被扔掉了。

这是一种可以更清晰、更递归地对其建模的方法。

public static <T> Observable<T> expandObservable(
    final T initialValue, final Func1<T, T> expandFunc) {
  return Observable.just(initialValue)
      .concatWith(Observable.defer(new Func0<Observable<T>>() {

        @Override
        public Observable<T> call() {
          return expandObservable(expandFunc.call(initialValue), expandFunc);
        }

      });
}

因此,在此示例中,每个递归传递都输出当前值(在每个步骤中扩展,并与下一步连接。defer用于防止立即发生无限递归,因为它不会调用代码来创建 Observable 直到它订阅了。使用它看起来像这样:

expandObservable(1, new Func1<Integer, Integer>() {

  @Override
  public Integer call(final Integer input) {
    final Integer output = input + 1;
    return output * output;
  }

}).subscribe(/** do whatever */);

所以,很像这个compose例子,但是一个更整洁和更清洁的实现。

希望有帮助。

于 2015-01-26T00:23:42.467 回答
-1

我想你正在寻找map

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#map

numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.map({it * it}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);

1
4
9
16
25
Sequence complete
于 2014-11-14T12:05:44.393 回答