5

我在我的 android 应用程序中使用 RxJava,它多次遇到 OutOfMemoryError。我用设备管理器检查了它,我刚刚注意到,我有超过 200 个线程,其中大多数处于等待状态,通常是 RxCachedThreadScheduler。由于线程过多而引发 OOMError。我还注意到,如果我按下一个按钮,它会调用一个服务并获取一个令牌并缓存它,线程数会增加 5!

所以,我用谷歌搜索发现,Schedulers.io 可以创建无限线程。当我用 Schedulers.computation 替换每个 Schedulers.io 时,问题就消失了,但这没有任何意义,因为我使用 Schedulers.io 就像它应该使用的那样。

那么我如何使用 Schedulers.io 并确保它不会创建太多线程呢?

更新

我这样取消订阅:

    final Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            long last = lastServerCommunication.getMillis();
            LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
            pref.set(last);
            worker.unsubscribe();
        }
    });

更新#2

我使用 Schedulers.io 的常规方式是:

public Observable<Scenario> load() {
    return Observable
            .create(new Observable.OnSubscribe<Scenario>() {
                @Override
                public void call(Subscriber<? super Scenario> subscriber) {
                    try {
                        Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
                        subscriber.onNext(scenario);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(new Throwable());
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

和:

    mSomeSubscription = mSomeManager.readFromDatabase()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<List<SomeEntry>>() {
                @Override
                public void onCompleted() { }

                @Override
                public void onError(Throwable e) {
                    // some logging
                }

                @Override
                public void onNext(List<SomeEntry> Entries) {
                    // Some action
                }
            });
4

3 回答 3

5

好的,我找到了原因。请参阅说明,更新 #2

return Observable.create(new Observable.OnSubscribe<Something>() {
            @Override
            public void call(Subscriber<? super Something> subscriber) {
                try {
                  // Some action
                    subscriber.onNext(scenario);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(new Throwable());
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

当您创建像这样的冷 Observable 序列时,您必须确保在订阅者上调用 onCompleted,请参见上文subscriber.onCompleted();。好吧,它在代码中的某些地方不存在,因此生成了 io 线程。

非常感谢akarnokd的帮助!

于 2015-07-09T15:36:30.923 回答
2

如果您使用Schedulers.io().createWorker(),则必须unsubscribe()Worker完成后使用。常规的 RxJava 操作符不应该泄露任何 worker 和线程。

于 2015-07-08T17:57:44.917 回答
1

太棒了,如果你不调用 onComplete 方法,那么线程(由创建Schedulers.io())将保持等待并为那些冷的 observable 存在。

于 2016-07-15T10:34:57.957 回答