4

我正在使用 RxJava,并且Observable里面有多个项目。我想做的是在第一个项目上运行函数 A,在所有项目上运行函数 B,并在Observable完成时运行函数 C:

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C

有没有用 lambda 函数表达这一点的聪明方法?我已经有以下解决方案,但它看起来很难看,我怀疑有更好的方法来做到这一点:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());
4

3 回答 3

8

用于Observable.defer封装每个订阅状态(作为一个布尔值,指示我们是否在第一条记录上)。

这是演示使用的可运行类:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}

尽管 OP 询问 RxJava1,但上面的解决方案与 RxJava2 相同:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}
于 2015-09-03T04:07:57.143 回答
3

我想我自己找到了一个简单的解决方案:

Observable<Integer> observable = Observable.just(1, 2, 3).share();
observable.take(1).subscribe(this::runA);
observable.subscribe(
    this::runB,
    throwable -> {},
    this::runC);

这适用于单线程,它似乎也适用于多线程,但我不得不承认到目前为止我对此并不太有信心。

于 2015-09-03T11:16:27.613 回答
2

已经内置了doOnNextdoOnTerminate(或类似的),所以听起来所缺少的只是对第一个项目执行操作。这是一种方法。您可以发布您的流,然后在一个块中流正常进行,而在单独的订阅中,我们仅侦听第一个事件(带有first)并在收到它时执行操作。这是一个例子:

observable.publish(new Func1<Observable<Item>, Observable<Item>>() {
    @Override
    public Observable<Item> call(Observable<Item> itemObservable) {
        itemObservable.first().subscribe((Item item) -> runA(item));
        return itemObservable;
    }
}).subscribe(/* some actions and subscription as usual ... */);

如果这看起来太冗长,您可以将其放在实用程序 Transformer 中并保留一些构建器语法。例如:

public static class Transformers<T> implements Observable.Transformer<T, T> {
    private final Action1<T> action1;

    private Transformers(final Action1<T> action1) {
        this.action1 = action1;
    }
    // cover for generics
    public static <T> Observable.Transformer<T, T> doOnFirst(final Action1<T> action1) {
        return new Transformers<T>(action1);
    }

    @Override
    public Observable<T> call(final Observable<T> observable) {
        return observable.publish(new Func1<Observable<T>, Observable<T>>() {
            @Override
            public Observable<T> call(Observable<T> observable) {
                observable.first().subscribe(action1);
                return observable;
            }
        });
    }
}

然后你可以这样称呼它:

observable
    .compose(Transformers.doOnFirst((Item item) -> runA(item)))
    .subscribe(/* chain and subscribe as usual... */);

显然这一切使用 Lambda 语法看起来要好得多,以上是我猜测它的样子。

于 2015-09-02T17:07:13.920 回答