2

我正在寻找一种方法来定义观察者的顺序(?)。

@GET("/get_user_msgs")
Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params);

例如,我通过 Retrofit 创建的 Rest API 提供了一个 Observable。

在我的ListView观察Observable中。

api.getPrivateMessages(params).subscribe(new Observer());

我还有一个用于我的 Espresso 测试的 API 包装器,并且我在Observable那里订阅了相同的包装器。这种方式首先调用 API 包装器中的观察者,然后才ListView 调用观察者。

public class IdlingWrapper implements Api, IdlingResource { 
   ....

    public IdlingWrapper(Api realApi) {
        this.realApi = realApi;
    }

    ...

    public Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params); {
        counter.incrementAndGet();
        return wrapObservable(realApi.getPrivateMessages(params));
    }

    protected <T> Observable<T> wrapObservable(final Observable<PrivateMessagesResponse> observable) {
        //what to do here?
    }
}

有没有办法在所有其他人完成后强制通知一些观察者?或者类似的事情?

就像是

Observable observable = getObservable();
observable.subscribeAsLast(new LastObserver());
observable.subscribe(new ObserverA());
observable.subscribe(new ObserverB());

所以ObserverA会先通知,然后ObserverB才通知LastObserver

或任何其他方法,我可以找出所有注册观察员何时得到通知和完成。

4

3 回答 3

1

我不确定您在 IdlingWrapper 中尝试做什么,但我认为当前的实现非常脆弱。

我认为需要发生的最重要的事情是保证 observable 只能被调用一次。

这是一个快速实现来演示这一点以及我对 wrapObservable 的实现。

public class Test {

    private static int counter = 0;

    private static final List<Observable<?>> list = Collections.synchronizedList(new ArrayList<>());

    protected static <T> Observable<T> wrapObservable(final Observable<T> original) {
        // run atleast once???
        synchronized (list) {
            list.add(original);
        }

        return Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                synchronized (list) {
                    counter++;
                    if (!list.contains(original)) {
                        subscriber.onError(new Exception("You can only subscribe once!"));
                        return;
                    }
                    list.remove(original);
                }

                // Sleep to make it easier to see things happening...
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException ignored) {
                }

                subscriber.onCompleted();
            }
        }).flatMap(new Func1<Void, Observable<? extends T>>() {
            @Override
            public Observable<? extends T> call(Void o) {
                return original;
            }
        }).finallyDo(new Action0() {
            @Override
            public void call() {
                synchronized (list) {
                    counter--;
                    if (list.size() == 0 && counter == 0) {
                        System.err.println("finally");
                    }
                }
            }
        });
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i = 0; i < 10; i++) {
            // running in io thread for simulating async call.
            Observable<String> test = wrapObservable(Observable.from("TEST!!!!!!")).subscribeOn(Schedulers.io());
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });

            // example of calling the same observable twice.
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });
        }

        Thread.sleep(10000);
    }
}
于 2014-05-08T23:54:18.223 回答
0

看来,这工作得很好。

protected <T> Observable<T> wrapObservable(final Observable<T> original) {
    return Observable.create(new Observable.OnSubscribeFunc<T>() {
        @Override
        public Subscription onSubscribe(final Observer<? super T> t1) {
            original.subscribe(new Observer<T>() {
                @Override
                public void onCompleted() {
                    t1.onCompleted();
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onError(Throwable e) {
                    t1.onError(e);
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onNext(T args) {
                    t1.onNext(args);
                }
            });

            return Subscriptions.empty();
        }
    });
}
于 2014-05-08T07:16:34.770 回答
0

如果您只想使用内置的 RxJava 方法对观察者进行排序,则可以使用 flatMap 和 range 将每个项目转换为多个项目,每个项目都有一个优先级,然后按优先级进行过滤。观察者是根据它们的过滤方式排序的。

这是一个简单的例子:

Observable<Pair<Integer, Object>> shared = RxView.clicks(findViewById(R.id.textView))
        .flatMap(c -> Observable.range(0, 2).map(i -> Pair.create(i, c)))
        .share();

shared.filter(p -> p.first == 1)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "first subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "first subscribed onNext"));

shared.filter(p -> p.first == 0)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "second subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "second subscribed onNext"));

如果你到处都这样做

于 2017-09-17T22:55:57.850 回答