1

一般来说,我对 RxJava 和 FP 还是很陌生。我想写一个代码来加入两个Observables。假设我们有两组整数:

  • [0..4]以 键选择器为模2,给出(key, value) = {(0,0), (1,1), (0,2),...}
  • [0..9]以 键选择器为模3,给出(key, value) = {(0,0), (1,1), (2,2), (0,3), (1,4),...}

我加入他们的步骤如下:

  1. 按其键对每个集合进行分组。第一组使用键0和创建两个组1。第二个使用键创建三个组0,12
  2. 制作两组组的笛卡尔积,总共给出 6 对组,键为:0-0, 0-1, 0-2, 1-0, 1-1, 1-2
  3. 只过滤那些两边都有相同键的对,只留下0-01-1
  4. 在每一对中,制作左右组的笛卡尔积。

下面是计算笛卡尔积的辅助类:

public class Cross<TLeft, TRight, R> implements Observable.Transformer<TLeft, R> {
    private Observable<TRight>      _right;
    private Func2<TLeft, TRight, R> _resultSelector;

    public Cross(Observable<TRight> right, Func2<TLeft, TRight, R> resultSelector) {
        _right = right;
        _resultSelector = resultSelector;
    }

    @Override
    public Observable<R> call(Observable<TLeft> left) {
        return left.flatMap(l -> _right.map(r -> _resultSelector.call(l, r)));
    }
}

这是加入的代码:

Observable.range(0, 5).groupBy(i -> i % 2)
        .compose(new Cross<>(Observable.range(0, 10).groupBy(i -> i % 3), ImmutablePair::new))
        .filter(pair -> pair.left.getKey().equals(pair.right.getKey()))
        .flatMap(pair -> pair.left.compose(new Cross<>(pair.right, ImmutablePair::new)))
        .subscribe(System.out::println);

但是,输出不正确:

(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)

如果我删除包含 的行filter,则根本没有结果。正确的输出应该就像运行这个:

Observable.range(0, 5)
        .compose(new Cross<>(Observable.range(0, 10), ImmutablePair::new))
        .filter(pair -> pair.left % 2 == pair.right % 3)
        .subscribe(System.out::println);

这使:

(0,0)
(0,3)
(0,6)
(0,9)
(1,1)
(1,4)
(1,7)
(2,0)
(2,3)
(2,6)
(2,9)
(3,1)
(3,4)
(3,7)
(4,0)
(4,3)
(4,6)
(4,9)

有人可以解释这种行为吗?非常感谢。

注意:我使用org.apache.commons.lang3.tuple.ImmutablePair以防您想知道。

4

1 回答 1

2

问题是此设置尝试多次订阅一个组,这是不允许的。你会看到subscribe(System.out::println, Throwable::printStackTrace);超载的异常,总是建议使用另一个。这是一个固定示例,它允许以另一层 ImmutablePair 为代价重用:

Func1<Integer, Integer> m2 = i -> i % 2;
Func1<Integer, Integer> m3 = i -> i % 3;

Observable<ImmutablePair<Integer, Observable<Integer>>> g2 = 
        Observable.range(0, 5).groupBy(m2).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));
Observable<ImmutablePair<Integer, Observable<Integer>>> g3 = 
        Observable.range(0, 10).groupBy(m3).map(g -> new ImmutablePair<>(g.getKey(), g.cache()));

Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x1 
= g2.compose(new Cross<>(g3, ImmutablePair::new));

Observable<ImmutablePair<ImmutablePair<Integer, Observable<Integer>>, ImmutablePair<Integer, Observable<Integer>>>> x2 
= x1.filter(pair -> pair.left.getKey().equals(pair.right.getKey()));


Observable<ImmutablePair<Integer, Integer>> o = x2.flatMap(pair -> 
pair.left.right.compose(new Cross<>(pair.right.right, ImmutablePair::new)));

o.subscribe(System.out::println, Throwable::printStackTrace);

(对于长类型,我很抱歉,如果我尝试内联它们而不是使用局部变量,Eclipse 会出现各种推理问题)

于 2015-11-25T10:51:12.893 回答