3

Lets say I'm observing an observable in a very specific way.

    resultObservable = anotherObservable.filter(~Filter code~).take(15);  

I'd like to create a custom operator that combines two predefined operators like filter, and take. Such that it behaves like

    resultObservable = anotherObservable.lift(new FilterAndTake(15));  

or...

    resultObservable = anotherObservable.FilterAndTake(15);  

So far Im comfortable with writing a very specific operator that can do this. And I can lift that operator.

But, given my currently limited knowledge of rx java, this would involve re-writing the take and filter functionality every time I need to use it in a custom operator.

Doing this is fine, But I'd rather re-use pre-existing operators that are maintained by an open source community, as well as recycle operators I've created.

Something also tells me I lack adequate knowledge about operators and subscribers.

Can someone recommend tutorials that aren't rx-java documentation?
I say this because, while docs explain general concepts, it isolates the concepts and general contexts of their functionality leaving no examples to inspire more robust applications of RX java.

So essentailly

I'm trying to encapsulate custom-dataflows into representative operators. Does this functionality exist?

4

2 回答 2

5

我不知道构成Operator对象的某些特殊功能(或糖)。但是您可以简单地创建一个新的 Operator 来组合现有的 Operator。这是FilterAndTakeOperator 的一个工作示例:

public class FilterAndTake<T> implements Observable.Operator<T, T> {

    private OperatorFilter<T> filter;
    private OperatorTake<T> take;

    public FilterAndTake(Func1<? super T, Boolean> predicate, int n) {
        this.filter = new OperatorFilter<T>(predicate);
        this.take = new OperatorTake<T>(n);
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        return filter.call(take.call(child));
    }
}

然后您可以按如下方式使用它:

public static void main(String[] args) {
    Observable<Integer> xs = Observable.range(1, 8);

    Func1<Integer, Boolean> predicate = new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer x) {
            return x % 2 == 0;
        }
    };

    Action1<Integer> action = new Action1<Integer>() {
        @Override
        public void call(Integer x) {
            System.out.println("> " + x);
        }
    };

    xs.lift(new FilterAndTake<Integer>(predicate, 2)).subscribe(action);
}
于 2014-08-15T12:21:09.677 回答
2

聚会有点晚了,但这就是compose存在的原因:

Observable
.from(....)
.flatMap(... -> ....)
.compose(filterAndTake(15))
.subscribe(...)

public <T> Transformer<T,T> flterAndTake(int num) {
  return source -> source
     .filter(~Filter code~)
     .take(num);
}
于 2016-12-05T14:39:05.817 回答