1

我用很多运算符构建了很长的 rxjava 链(带有改造请求):doOnNext、doOnError、switchIfEmpty、onErrorResumeNext、flatMap 在某些设备(例如 Android 4.1)上,当链经过最长的路线时,它会抛出 StackOverflowError 异常。

是否有一些方法或最佳实践来优化链或防止 StackOverflowError?

现在我只看到一种方法 - 中断链,并从第一个 onComplete(OnNext) 调用第二部分,但我认为这不是反应方式。

另一种方式 - 使用 .subscribeOn(Schedulers.newThread()); 更改线程;操作员。似乎也不是最好的解决方案。

我的代码:1)订阅代码

fastSearch(keyphrase)
    .onErrorResumeNext(throwable -> {
        return correctKeyphraseAndSearch(keyphrase);
    })
    .doOnNext(resultsDao -> {...})
    .subscribe(...)

2) 辅助方法

public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
    String SRD = "true";
    final HttpQueryParams params = new HttpQueryParams();
    //read from cache chain
    Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
            .doOnNext(...)
            .doOnError(...)
            .onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
    //network request chain
    Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
            .retryWhen(new OnNewSessionRequired())
            .doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
    //combine cache+network chains
    return cacheChain
            .switchIfEmpty(networkChain)
            .doOnNext(resultsDao -> resultsDao.setKeyphrase(keyphrase));
}

public static Observable<SearchResultsDao> correctKeyphraseAndSearch(final String keyphrase) {
    return mainDiv()
            .flatMap(str -> syntax(str, keyphrase, true))
            .flatMap(syntaxDao -> {
                //get corrected keyphrase from server
                StringBuilder newKeyphrase = ... ;
                //repeat search request with new keyphrase
                return fastSearch(newKeyphrase.toString());
            });
}

3)一些评论:

mainDiv() 和 syntax() 方法与 fastSearch() 方法相同,但对服务器执行另一个请求

getCache().fastSearch() - 创建从自己的缓存中读取数据的 observable(类似改造:getCache() 实现改造 Api 方法接口)

堆栈跟踪:

Uncaught Exception
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305)
at java.util.concurrent.FutureTask.run(FutureTask.java:137)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569)
at java.lang.Thread.run(Thread.java:856)
Caused by: java.lang.StackOverflowError
okhttp3.HttpUrl.newBuilder (HttpUrl.java:633)
retrofit2.RequestBuilder.addQueryParam (RequestBuilder.java:144)
retrofit2.ParameterHandler$Query.apply (ParameterHandler.java:109)
retrofit2.ServiceMethod.toRequest (ServiceMethod.java:108)
retrofit2.OkHttpCall.createRawCall (OkHttpCall.java:178)
retrofit2.OkHttpCall.execute (OkHttpCall.java:162)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:171)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar (OperatorMerge.java:368)
rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit (OperatorMerge.java:330)
rx.internal.operators.OperatorMerge$InnerSubscriber.onNext (OperatorMerge.java:807)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError (OperatorSwitchIfEmpty.java:116)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OnSubscribeRedo$4$1.onError (OnSubscribeRedo.java:331)
rx.internal.operators.OperatorMerge$MergeSubscriber.reportError (OperatorMerge.java:243)
rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate (OperatorMerge.java:779)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop (OperatorMerge.java:540)
rx.internal.operators.OperatorMerge$MergeSubscriber.emit (OperatorMerge.java:529)
rx.internal.operators.OperatorMerge$InnerSubscriber.onError (OperatorMerge.java:813)
rx.Observable$ThrowObservable$1.call (Observable.java:10200)
rx.Observable$ThrowObservable$1.call (Observable.java:10190)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:307)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:289)
rx.internal.operators.NotificationLite.accept (NotificationLite.java:150)
rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext (SubjectSubscriptionManager.java:253)
rx.subjects.BehaviorSubject.onNext (BehaviorSubject.java:160)
rx.internal.operators.OnSubscribeRedo$2$1.onError (OnSubscribeRedo.java:242)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:43)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:38)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:173)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSubscribeOn$1.call (OperatorSubscribeOn.java:94)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:442)
4

2 回答 2

6

Stackoverflows 很少发生在典型的流中;没有代码,我无法确定出了什么问题。

在早期版本的 RxJava 中,一些运算符存在重入问题,可能导致 StackOverflows,但我们已经有一段时间没有收到类似的错误报告了。请确保您使用的是来自 1.x 行的最新 RxJava(RxAndroid 插件的依赖项可能需要覆盖,因为它在这方面不经常更新)。

您可以做几件事:

  • 压缩后续doOnNextdoOnError调用 doOnEach
  • observeOn(Schedulers.computation())偶尔添加
  • 避免链接太多mergeWiths,将源收集到 a 中List并使用merge(Iterable)
  • 升级到 RxJava 2.x 应该有更少的堆栈深度(尚未验证)

编辑:

根据堆栈跟踪,我的不解是您有太多的空序列,并且switchIfEmpty在从一个空序列切换到下一个空序列时不断加深堆栈。如您所见,subscribeOn有助于“重新启动”堆栈。我将研究 RxJava 本身可能的解决方案。

于 2017-02-21T09:50:47.440 回答
0

这个问题的原因是我在项目中使用 Jack 编译器 ( https://source.android.com/source/jack.html ) 来支持 lambdas。我把它关掉了,现在一切正常

于 2017-02-27T18:15:20.087 回答