我用很多运算符构建了很长的 rxjava 链(带有改造请求):doOnNext、doOnError、switchIfEmpty、onErrorResumeNext、flatMap 在某些设备(例如 Android 4.1)上,当链经过最长的路线时,它会抛出 StackOverflowError 异常。
是否有一些方法或最佳实践来优化链或防止 StackOverflowError?
现在我只看到一种方法 - 中断链,并从第一个 onComplete(OnNext) 调用第二部分,但我认为这不是反应方式。
另一种方式 - 使用 .subscribeOn(Schedulers.newThread()); 更改线程;操作员。似乎也不是最好的解决方案。
我的代码:1)订阅代码
fastSearch(keyphrase)
.onErrorResumeNext(throwable -> {
return correctKeyphraseAndSearch(keyphrase);
})
.doOnNext(resultsDao -> {...})
.subscribe(...)
2) 辅助方法
public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
String SRD = "true";
final HttpQueryParams params = new HttpQueryParams();
//read from cache chain
Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
.doOnNext(...)
.doOnError(...)
.onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
//network request chain
Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
.retryWhen(new OnNewSessionRequired())
.doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
//combine cache+network chains
return cacheChain
.switchIfEmpty(networkChain)
.doOnNext(resultsDao -> resultsDao.setKeyphrase(keyphrase));
}
public static Observable<SearchResultsDao> correctKeyphraseAndSearch(final String keyphrase) {
return mainDiv()
.flatMap(str -> syntax(str, keyphrase, true))
.flatMap(syntaxDao -> {
//get corrected keyphrase from server
StringBuilder newKeyphrase = ... ;
//repeat search request with new keyphrase
return fastSearch(newKeyphrase.toString());
});
}
3)一些评论:
mainDiv() 和 syntax() 方法与 fastSearch() 方法相同,但对服务器执行另一个请求
getCache().fastSearch() - 创建从自己的缓存中读取数据的 observable(类似改造:getCache() 实现改造 Api 方法接口)
堆栈跟踪:
Uncaught Exception
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:442)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:305)
at java.util.concurrent.FutureTask.run(FutureTask.java:137)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1076)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:569)
at java.lang.Thread.run(Thread.java:856)
Caused by: java.lang.StackOverflowError
okhttp3.HttpUrl.newBuilder (HttpUrl.java:633)
retrofit2.RequestBuilder.addQueryParam (RequestBuilder.java:144)
retrofit2.ParameterHandler$Query.apply (ParameterHandler.java:109)
retrofit2.ServiceMethod.toRequest (ServiceMethod.java:108)
retrofit2.OkHttpCall.createRawCall (OkHttpCall.java:178)
retrofit2.OkHttpCall.execute (OkHttpCall.java:162)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:171)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar (OperatorMerge.java:368)
rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit (OperatorMerge.java:330)
rx.internal.operators.OperatorMerge$InnerSubscriber.onNext (OperatorMerge.java:807)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onNext (OperatorSwitchIfEmpty.java:89)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext (OperatorOnErrorResumeNextViaFunction.java:153)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
rx.internal.operators.OperatorDoOnEach$1.onNext (OperatorDoOnEach.java:85)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:53)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.onError (OperatorSwitchIfEmpty.java:116)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OnSubscribeRedo$4$1.onError (OnSubscribeRedo.java:331)
rx.internal.operators.OperatorMerge$MergeSubscriber.reportError (OperatorMerge.java:243)
rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate (OperatorMerge.java:779)
rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop (OperatorMerge.java:540)
rx.internal.operators.OperatorMerge$MergeSubscriber.emit (OperatorMerge.java:529)
rx.internal.operators.OperatorMerge$InnerSubscriber.onError (OperatorMerge.java:813)
rx.Observable$ThrowObservable$1.call (Observable.java:10200)
rx.Observable$ThrowObservable$1.call (Observable.java:10190)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:235)
rx.internal.operators.OperatorMerge$MergeSubscriber.onNext (OperatorMerge.java:145)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OperatorMap$1.onNext (OperatorMap.java:54)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:307)
rx.internal.operators.OnSubscribeRedo$3$1.onNext (OnSubscribeRedo.java:289)
rx.internal.operators.NotificationLite.accept (NotificationLite.java:150)
rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext (SubjectSubscriptionManager.java:253)
rx.subjects.BehaviorSubject.onNext (BehaviorSubject.java:160)
rx.internal.operators.OnSubscribeRedo$2$1.onError (OnSubscribeRedo.java:242)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:43)
retrofit2.adapter.rxjava.OperatorMapResponseToBodyOrError$1.onNext (OperatorMapResponseToBodyOrError.java:38)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request (RxJavaCallAdapterFactory.java:173)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OnSubscribeRedo$2$1.setProducer (OnSubscribeRedo.java:272)
rx.Subscriber.setProducer (Subscriber.java:205)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:152)
retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call (RxJavaCallAdapterFactory.java:138)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OnSubscribeRedo$2.call (OnSubscribeRedo.java:278)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue (TrampolineScheduler.java:80)
rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule (TrampolineScheduler.java:59)
rx.internal.operators.OnSubscribeRedo$5.request (OnSubscribeRedo.java:366)
rx.internal.producers.ProducerArbiter.setProducer (ProducerArbiter.java:126)
rx.internal.operators.OperatorSwitchIfEmpty$AlternateSubscriber.setProducer (OperatorSwitchIfEmpty.java:106)
rx.Subscriber.setProducer (Subscriber.java:205)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:358)
rx.internal.operators.OnSubscribeRedo.call (OnSubscribeRedo.java:55)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate (OperatorSwitchIfEmpty.java:78)
rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted (OperatorSwitchIfEmpty.java:71)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4$1.onCompleted (OperatorOnErrorResumeNextViaFunction.java:125)
rx.Observable$EmptyHolder$1.call (Observable.java:1123)
rx.Observable$EmptyHolder$1.call (Observable.java:1120)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError (OperatorOnErrorResumeNextViaFunction.java:141)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
rx.internal.operators.OperatorDoOnEach$1.onError (OperatorDoOnEach.java:71)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:56)
com.testrx.app.retrofit.api.CacheHelper$1.call (CacheHelper.java:46)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable$2.call (Observable.java:162)
rx.Observable$2.call (Observable.java:154)
rx.Observable.unsafeSubscribe (Observable.java:8314)
rx.internal.operators.OperatorSubscribeOn$1.call (OperatorSubscribeOn.java:94)
rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:55)
java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:442)