好吧,您给出的示例可以简化为:
Observable.range(0, Integer.MAX_VALUE)
但我假设你实际上想做一些更复杂的事情。scan
与您正在寻找的内容不同,但它可以做类似的事情,我们可以使用它来制作一个Transformer
您可以重复使用的类似来扩展。
的主要区别scan
在于它在每一步都需要一个新的输入值,但是,就像展开一样,它保留了以前的值。我们可以通过忽略新的输入值来解决这个问题。因为扫描类似于扩展,所以我将从一个scan
有一些相当大缺陷的示例开始,然后探索一个更好的选择。
public class Expand<T, T> implements Transformer<T, T> {
private final Func1<T, T> expandFunc;
public Expand(final Func1<T, T> expandFunc) {
this.initialValue = initialValue;
this.expandFunc = expandFunc;
}
@Override
public Observable<T> call(Observable<T> source) {
// Here we treat emissions from the source as a new 'initial value'.
// NOTE: This will effectively only allow one input from the source, since the
// output Observable expands infinitely. If you want it to switch to a new expanded
// observable each time the source emits, use switchMap instead of concatMap.
return source.concatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T initialValue) {
// Make an infinite null Observable, just for our next-step signal.
return Observable.<Void>just(null).repeat()
.scan(initialValue, new Func2<T, Void, T>() {
@Override
public T call(final T currentValue, final Void unusedSignal) {
return expandFunc.call(currentValue);
}
});
}
});
}
}
要使用该转换器,让我们创建一个获取当前数字、加 1 并对其平方的方法。
Observable.just(1).compose(new Expand(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
});
无论如何,您可能已经注意到这种方法的一些主要尴尬点。首先,有 switch 与 concatMap 的事情,以及这本质上如何将一个项目从 Observable 输出变为无限链。其次,整个 'Void' 信号 Observable 不应该是必要的。当然,我们可以使用range
orjust(1).repeat()
或许多其他东西,但它们最终还是被扔掉了。
这是一种可以更清晰、更递归地对其建模的方法。
public static <T> Observable<T> expandObservable(
final T initialValue, final Func1<T, T> expandFunc) {
return Observable.just(initialValue)
.concatWith(Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
return expandObservable(expandFunc.call(initialValue), expandFunc);
}
});
}
因此,在此示例中,每个递归传递都输出当前值(在每个步骤中扩展,并与下一步连接。defer
用于防止立即发生无限递归,因为它不会调用代码来创建 Observable 直到它订阅了。使用它看起来像这样:
expandObservable(1, new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer input) {
final Integer output = input + 1;
return output * output;
}
}).subscribe(/** do whatever */);
所以,很像这个compose
例子,但是一个更整洁和更清洁的实现。
希望有帮助。