1
Observable observable = Observable.from(backToArray(downloadWebPage("URL")))
            .map(new Func1<String[], Pair<String[], String[]>>() {
                @Override
                public Pair<String[], String[]> call(String[] of) {
                    return new Pair<>(of,
                            backToArray(downloadWebPage("URL" + of[0])).get(0));
                }
            });

    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(
            (new Observer<Pair>() {

                @Override
                public void onCompleted() {
                    // Update user interface if needed
                }

                @Override
                public void onError(Throwable t) {
                    // Update user interface to handle error
                }

                @Override
                public void onNext(Pair p) {
                   offices.add(new Office((String[]) p.first, (String[]) p.second));
                }
           }));

这运行,我得到 android.os.NetworkOnMainThreadException。我希望它运行 subscribeOn() 方法设置的新线程。

4

3 回答 3

4

假设实际的网络请求发生在 中downloadWebPage(),则错误在代码的第一行:

Observable observable = Observable.from(backToArray(downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)))

这相当于:

String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode)

Observable observable = Observable.from(backToArray(response))

这应该清楚地表明,在创建downloadWebPage任何内容之前,在主线程上执行Observable,更不用说订阅了。RxJava 在这方面无法改变 Java 的语义。

但是,您可以做的是这样的事情(未经测试,但应该是正确的):

Observable observable = Observable.create(new Observable.OnSubscribe<String[]>() {
        @Override
        public void call(final Subscriber<? super String[]> subscriber) {
            final String[] response = downloadWebPage("http://api.ataxcloudapp.com/v1/franchise/listing/?location=" + ZIPCode);
            if (! subscriber.isUnsubscribed()) {
                subscriber.onNext(backToArray(response));
                subscriber.onCompleted();
            }
        }
)

现在您的网络请求将仅在订阅后才会发生,并将Observable移动到您在 中指定的线程subscribeOn()

于 2015-05-02T07:46:38.237 回答
4

您可以使用defer()将调用推迟downloadWebPage到订阅 observable 的那一刻。

例子:

private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

资源

于 2015-05-02T10:20:01.013 回答
0

你应该从

**observable.subscribeOn(Schedulers.newThread())**

**observable.subscribeOn(Schedulers.io())**
于 2015-12-23T09:58:40.103 回答