实际上我已经创建了一个 RxSearch 类型的配置。我在其中附加了一个带有 PublishSubject 的 Edittext textChangeListener。使用事件将字符发送到 Observable,该 Observable 用作改造 API 调用的输入。
问题
我面临的唯一问题是有时我从可观察的 onError() 回调中的 API“流的意外结束”中得到错误。一旦我得到错误,Observable 就会停止工作。Observable 关闭,无法从 PublishSubject 的 onNext() 中获取字符。
看看RxSearchObservable
class RxSearchObservable {
companion object {
fun fromView(editText: EditText): Observable<String> {
val subject = PublishSubject.create<String>()
editText.addTextChangedListener(object : TextWatcher {
override fun afterTextChanged(s: Editable?) {
//subject.onComplete()
}
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
//subject.onNext(s.toString())
}
override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
if (s.isNotEmpty()) subject.onNext(s.toString())
}
})
return subject
}
}
}
我如何在 SwitchMap 中订阅和进行改造 API 调用。
RxSearchObservable.fromView(edtToolSearch)
.debounce(700, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.retryWhen { t -> t.delay(3, TimeUnit.SECONDS) }
.switchMap { searchTerm ->
runOnUiThread { progressBar.visibility = View.VISIBLE }
apiManager.getSearchUnits(searchTerm)
}
.onErrorResumeNext(Observable.empty())
.subscribe({ response ->
Log.i("Called subscribe", ":::::::::::+++++++++++++++ GONE")
progressBar.visibility = View.GONE
if (response.isSuccessful) {
val units = response.body()
val searchedDatasets = units?.dataset
if (searchedDatasets?.size!! > 0) {
val searchAdapter = SearchAdapter(this@MapActivity, searchedDatasets, false)
listSearch.visibility = View.VISIBLE
listSearch.adapter = searchAdapter
} else {
toast("No items found !!!")
}
} else {
apiError = ErrorUtils.parseError(response)
toast(apiError.msg)
}
}, { t: Throwable? ->
progressBar.visibility = View.GONE
toast(t?.message.toString())
}))
任何想法、帮助、建议将不胜感激。提前致谢。