3

我有一个Button从中创建一个Observable<OnClickEvent>.

单击该按钮时,我希望从网络中获取文件,但我遇到了有关网络和线程的问题。

这个例子抛出android.os.NetworkOnMainThreadException

Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

所以我尝试从另一个线程。

以下抛出rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main]

networkButtonObservable
    .subscribeOn(Schedulers.newThread())
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

好的..现在我在开始时尝试 a .debounce()

networkButtonObservable
    .debounce(10, TimeUnit.MILLISECONDS)
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

这成功了。

显然我不喜欢在我的代码中添加延迟,所以我试图弄清楚发生了什么,线程方面的。为什么第一个示例不在.map()后台线程中执行代码?

或者我在这里错过了什么?

- - 更新

我将我的 TestAPI 更改为返回一个 Observable,并将对 networkButtonObservable 的第一次调用更改为.flatMap(). 这也可以正常工作。但我仍然不知道为什么原来的使用方式.map()会失败。

networkButtonObservable
    .flatMap(new Func1<OnClickEvent, Observable<?>>() {
        @Override
        public Observable<?> call(OnClickEvent onClickEvent) {
            return TestAPI.getTestService().fetchTestResponseObservable();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );
4

1 回答 1

10

我不是Android专家,但根据错误消息,我认为您需要在主线程和后台线程之间反弹值。通常,Android 示例会向您展示在流处理中添加subscribeOn/observeOn对:

Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});

但在这些情况下,“源”通常是您可以控制的冷观测值。

在您的问题中,源是一个热点Observable,具有您需要在主线程上订阅的特定要求,但您需要在后台线程上进行网络调用,然后在主线程上显示结果。

在这种情况下,您可以observeOn多次使用:

networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));

我认为fetchTestResponseObservable有它自己的subscribeOnobserveOn适用于它,因此它不会引发网络异常。

另外我想提一下,使用多个subscribeOn在功能上等同于只使用最接近发射源的一个,但从技术上讲,它会占用未使用的线程资源。但是,在流中使用多个observeOn具有相关性,因为您可以使用它们有意义地在线程之间“流水线化”流处理。

于 2015-05-27T16:26:24.330 回答