0

对于每一种类型 T,都有一个包含 n 个 Observable 的 T 列表,我想构建一个 Observable,每次原始 Observable 中的一个发出某些东西时,它会发出一个包含 n 个 T 的列表。这通常被称为函数文献中的“序列”运算符。

伪语法中的期望行为示例:

val o1 = BehaviourSubject.create(true)
val o2 = BehaviourSubject.create(false)
val listOfObservables = [o1,o2]

val observableOfList = sequence(listOfObservables)

observableOfList.subscribe(print)

o2.onNext(true)

// Expected output:
// [true, false]
// [true, true]

我在java中编写了以下幼稚的实现,它行为不端:

public static <T> Observable<List<T>> sequence(List<Observable<T>> from) {
  return fold(from, Observable.<List<T>>never().startWith(new ArrayList<T>()),
            (arrayListObservable, observable) -> {
    return Observable.combineLatest(arrayListObservable, observable, (ts, t) -> {
      ts.add(t);
      return ts;
    });
  });
}

public static <F, T> T fold(final Iterable<? extends F> elements, final T zero, final Func2<T, F, T> f) {
  T currentValue = zero;
  for (final F element : elements) {
    currentValue = f.call(currentValue, element);
  }
  return currentValue;
}

// Actual output
// [true, false]
// [true, false, true]

不知何故,我需要重建结果列表,而不是将新值附加到现有列表中。你们会怎么做?

感谢您的时间和未来的答案!

4

1 回答 1

2

Rxx有一个重载,CombineLatest它需要一组 observables 并完全按照您的意愿行事。您应该能够相当轻松地将该方法的源代码移植到 Java。

于 2014-05-15T16:34:20.463 回答